Merge "Convert project tests into mock: fip/sg/key_pairs/topology"
This commit is contained in:
commit
c6c18b1aca
@ -17,11 +17,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from django import http
|
||||
from django.urls import reverse
|
||||
from django.utils.http import urlencode
|
||||
|
||||
from mox3.mox import IsA
|
||||
import mock
|
||||
import six
|
||||
|
||||
from openstack_dashboard import api
|
||||
@ -37,14 +36,13 @@ NAMESPACE = "horizon:project:floating_ips"
|
||||
|
||||
class FloatingIpViewTests(test.TestCase):
|
||||
|
||||
@test.create_stubs({api.neutron: ('floating_ip_target_list',
|
||||
'tenant_floating_ip_list',)})
|
||||
@test.create_mocks({api.neutron: ('floating_ip_target_list',
|
||||
'tenant_floating_ip_list')})
|
||||
def test_associate(self):
|
||||
api.neutron.floating_ip_target_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self._get_fip_targets())
|
||||
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.floating_ips.list())
|
||||
self.mox.ReplayAll()
|
||||
self.mock_floating_ip_target_list.return_value = \
|
||||
self._get_fip_targets()
|
||||
self.mock_tenant_floating_ip_list.return_value = \
|
||||
self.floating_ips.list()
|
||||
|
||||
url = reverse('%s:associate' % NAMESPACE)
|
||||
res = self.client.get(url)
|
||||
@ -54,17 +52,19 @@ class FloatingIpViewTests(test.TestCase):
|
||||
# Verify that our "associated" floating IP isn't in the choices list.
|
||||
self.assertNotIn(self.floating_ips.first(), choices)
|
||||
|
||||
@test.create_stubs({api.neutron: ('floating_ip_target_list_by_instance',
|
||||
'tenant_floating_ip_list',)})
|
||||
self.mock_floating_ip_target_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_tenant_floating_ip_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.create_mocks({api.neutron: ('floating_ip_target_list_by_instance',
|
||||
'tenant_floating_ip_list')})
|
||||
def test_associate_with_instance_id(self):
|
||||
targets = self._get_fip_targets()
|
||||
target = targets[0]
|
||||
api.neutron.floating_ip_target_list_by_instance(
|
||||
IsA(http.HttpRequest), target.instance_id) \
|
||||
.AndReturn([target])
|
||||
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.floating_ips.list())
|
||||
self.mox.ReplayAll()
|
||||
self.mock_floating_ip_target_list_by_instance.return_value = [target]
|
||||
self.mock_tenant_floating_ip_list.return_value = \
|
||||
self.floating_ips.list()
|
||||
|
||||
base_url = reverse('%s:associate' % NAMESPACE)
|
||||
params = urlencode({'instance_id': target.instance_id})
|
||||
@ -76,6 +76,11 @@ class FloatingIpViewTests(test.TestCase):
|
||||
# Verify that our "associated" floating IP isn't in the choices list.
|
||||
self.assertNotIn(self.floating_ips.first(), choices)
|
||||
|
||||
self.mock_floating_ip_target_list_by_instance.assert_called_once_with(
|
||||
test.IsHttpRequest(), target.instance_id)
|
||||
self.mock_tenant_floating_ip_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
def _get_compute_ports(self):
|
||||
return [p for p in self.ports.list()
|
||||
if not p.device_owner.startswith('network:')]
|
||||
@ -93,18 +98,17 @@ class FloatingIpViewTests(test.TestCase):
|
||||
def _get_target_id(port):
|
||||
return '%s_%s' % (port.id, port.fixed_ips[0]['ip_address'])
|
||||
|
||||
@test.create_stubs({api.neutron: ('floating_ip_target_list',
|
||||
'tenant_floating_ip_list',)})
|
||||
@test.create_mocks({api.neutron: ('floating_ip_target_list',
|
||||
'tenant_floating_ip_list')})
|
||||
def test_associate_with_port_id(self):
|
||||
compute_port = self._get_compute_ports()[0]
|
||||
associated_fips = [fip.id for fip in self.floating_ips.list()
|
||||
if fip.port_id]
|
||||
|
||||
api.neutron.floating_ip_target_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self._get_fip_targets())
|
||||
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.floating_ips.list())
|
||||
self.mox.ReplayAll()
|
||||
self.mock_floating_ip_target_list.return_value = \
|
||||
self._get_fip_targets()
|
||||
self.mock_tenant_floating_ip_list.return_value = \
|
||||
self.floating_ips.list()
|
||||
|
||||
base_url = reverse('%s:associate' % NAMESPACE)
|
||||
params = urlencode({'port_id': compute_port.id})
|
||||
@ -116,23 +120,25 @@ class FloatingIpViewTests(test.TestCase):
|
||||
# Verify that our "associated" floating IP isn't in the choices list.
|
||||
self.assertFalse(set(associated_fips) & set(choices.keys()))
|
||||
|
||||
@test.create_stubs({api.neutron: ('floating_ip_associate',
|
||||
self.mock_floating_ip_target_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_tenant_floating_ip_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.create_mocks({api.neutron: ('floating_ip_associate',
|
||||
'floating_ip_target_list',
|
||||
'tenant_floating_ip_list',)})
|
||||
'tenant_floating_ip_list')})
|
||||
def test_associate_post(self):
|
||||
floating_ip = [fip for fip in self.floating_ips.list()
|
||||
if not fip.port_id][0]
|
||||
compute_port = self._get_compute_ports()[0]
|
||||
port_target_id = self._get_target_id(compute_port)
|
||||
|
||||
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.floating_ips.list())
|
||||
api.neutron.floating_ip_target_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self._get_fip_targets())
|
||||
api.neutron.floating_ip_associate(IsA(http.HttpRequest),
|
||||
floating_ip.id,
|
||||
port_target_id)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_tenant_floating_ip_list.return_value = \
|
||||
self.floating_ips.list()
|
||||
self.mock_floating_ip_target_list.return_value = \
|
||||
self._get_fip_targets()
|
||||
self.mock_floating_ip_associate.return_value = None
|
||||
|
||||
form_data = {'instance_id': port_target_id,
|
||||
'ip_id': floating_ip.id}
|
||||
@ -140,23 +146,28 @@ class FloatingIpViewTests(test.TestCase):
|
||||
res = self.client.post(url, form_data)
|
||||
self.assertRedirectsNoFollow(res, INDEX_URL)
|
||||
|
||||
@test.create_stubs({api.neutron: ('floating_ip_associate',
|
||||
self.mock_tenant_floating_ip_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_target_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_associate.assert_called_once_with(
|
||||
test.IsHttpRequest(), floating_ip.id, port_target_id)
|
||||
|
||||
@test.create_mocks({api.neutron: ('floating_ip_associate',
|
||||
'floating_ip_target_list',
|
||||
'tenant_floating_ip_list',)})
|
||||
'tenant_floating_ip_list')})
|
||||
def test_associate_post_with_redirect(self):
|
||||
floating_ip = [fip for fip in self.floating_ips.list()
|
||||
if not fip.port_id][0]
|
||||
compute_port = self._get_compute_ports()[0]
|
||||
port_target_id = self._get_target_id(compute_port)
|
||||
|
||||
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.floating_ips.list())
|
||||
api.neutron.floating_ip_target_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self._get_fip_targets())
|
||||
api.neutron.floating_ip_associate(IsA(http.HttpRequest),
|
||||
floating_ip.id,
|
||||
port_target_id)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_tenant_floating_ip_list.return_value = \
|
||||
self.floating_ips.list()
|
||||
self.mock_floating_ip_target_list.return_value = \
|
||||
self._get_fip_targets()
|
||||
self.mock_floating_ip_associate.return_value = None
|
||||
|
||||
next = reverse("horizon:project:instances:index")
|
||||
form_data = {'instance_id': port_target_id,
|
||||
'next': next,
|
||||
@ -165,24 +176,27 @@ class FloatingIpViewTests(test.TestCase):
|
||||
res = self.client.post(url, form_data)
|
||||
self.assertRedirectsNoFollow(res, next)
|
||||
|
||||
@test.create_stubs({api.neutron: ('floating_ip_associate',
|
||||
self.mock_tenant_floating_ip_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_target_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_associate.assert_called_once_with(
|
||||
test.IsHttpRequest(), floating_ip.id, port_target_id)
|
||||
|
||||
@test.create_mocks({api.neutron: ('floating_ip_associate',
|
||||
'floating_ip_target_list',
|
||||
'tenant_floating_ip_list',)})
|
||||
'tenant_floating_ip_list')})
|
||||
def test_associate_post_with_exception(self):
|
||||
floating_ip = [fip for fip in self.floating_ips.list()
|
||||
if not fip.port_id][0]
|
||||
compute_port = self._get_compute_ports()[0]
|
||||
port_target_id = self._get_target_id(compute_port)
|
||||
|
||||
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.floating_ips.list())
|
||||
api.neutron.floating_ip_target_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self._get_fip_targets())
|
||||
api.neutron.floating_ip_associate(IsA(http.HttpRequest),
|
||||
floating_ip.id,
|
||||
port_target_id) \
|
||||
.AndRaise(self.exceptions.nova)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_tenant_floating_ip_list.return_value = \
|
||||
self.floating_ips.list()
|
||||
self.mock_floating_ip_target_list.return_value = \
|
||||
self._get_fip_targets()
|
||||
self.mock_floating_ip_associate.side_effect = self.exceptions.nova
|
||||
|
||||
form_data = {'instance_id': port_target_id,
|
||||
'ip_id': floating_ip.id}
|
||||
@ -190,78 +204,79 @@ class FloatingIpViewTests(test.TestCase):
|
||||
res = self.client.post(url, form_data)
|
||||
self.assertRedirectsNoFollow(res, INDEX_URL)
|
||||
|
||||
@test.create_stubs({api.nova: ('server_list',),
|
||||
self.mock_tenant_floating_ip_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_target_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_associate.assert_called_once_with(
|
||||
test.IsHttpRequest(), floating_ip.id, port_target_id)
|
||||
|
||||
@test.create_mocks({api.nova: ('server_list',),
|
||||
api.neutron: ('floating_ip_disassociate',
|
||||
'floating_ip_pools_list',
|
||||
'tenant_floating_ip_get',
|
||||
'tenant_floating_ip_list',
|
||||
'is_extension_supported',)})
|
||||
'tenant_floating_ip_list')})
|
||||
def test_disassociate_post(self):
|
||||
floating_ip = self.floating_ips.first()
|
||||
|
||||
api.nova.server_list(IsA(http.HttpRequest), detailed=False) \
|
||||
.AndReturn([self.servers.list(), False])
|
||||
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.floating_ips.list())
|
||||
api.neutron.floating_ip_pools_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.pools.list())
|
||||
api.neutron.floating_ip_disassociate(IsA(http.HttpRequest),
|
||||
floating_ip.id)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_server_list.return_value = [self.servers.list(), False]
|
||||
self.mock_tenant_floating_ip_list.return_value = \
|
||||
self.floating_ips.list()
|
||||
self.mock_floating_ip_pools_list.return_value = self.pools.list()
|
||||
self.mock_floating_ip_disassociate.return_value = None
|
||||
|
||||
action = "floating_ips__disassociate__%s" % floating_ip.id
|
||||
res = self.client.post(INDEX_URL, {"action": action})
|
||||
self.assertMessageCount(success=1)
|
||||
self.assertRedirectsNoFollow(res, INDEX_URL)
|
||||
|
||||
@test.create_stubs({api.nova: ('server_list',),
|
||||
self.mock_server_list.assert_called_once_with(test.IsHttpRequest(),
|
||||
detailed=False)
|
||||
self.mock_tenant_floating_ip_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_pools_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_disassociate.assert_called_once_with(
|
||||
test.IsHttpRequest(), floating_ip.id)
|
||||
|
||||
@test.create_mocks({api.nova: ('server_list',),
|
||||
api.neutron: ('floating_ip_disassociate',
|
||||
'floating_ip_pools_list',
|
||||
'tenant_floating_ip_get',
|
||||
'tenant_floating_ip_list',
|
||||
'is_extension_supported',)})
|
||||
'tenant_floating_ip_list')})
|
||||
def test_disassociate_post_with_exception(self):
|
||||
floating_ip = self.floating_ips.first()
|
||||
|
||||
api.nova.server_list(IsA(http.HttpRequest), detailed=False) \
|
||||
.AndReturn([self.servers.list(), False])
|
||||
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.floating_ips.list())
|
||||
api.neutron.floating_ip_pools_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.pools.list())
|
||||
api.neutron.floating_ip_disassociate(IsA(http.HttpRequest),
|
||||
floating_ip.id) \
|
||||
.AndRaise(self.exceptions.nova)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_server_list.return_value = [self.servers.list(), False]
|
||||
self.mock_tenant_floating_ip_list.return_value = \
|
||||
self.floating_ips.list()
|
||||
self.mock_floating_ip_pools_list.return_value = self.pools.list()
|
||||
self.mock_floating_ip_disassociate.side_effect = self.exceptions.nova
|
||||
|
||||
action = "floating_ips__disassociate__%s" % floating_ip.id
|
||||
res = self.client.post(INDEX_URL, {"action": action})
|
||||
self.assertRedirectsNoFollow(res, INDEX_URL)
|
||||
|
||||
@test.create_stubs({api.neutron: ('tenant_floating_ip_list',
|
||||
'floating_ip_pools_list',),
|
||||
self.mock_server_list.assert_called_once_with(test.IsHttpRequest(),
|
||||
detailed=False)
|
||||
self.mock_tenant_floating_ip_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_pools_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_disassociate.assert_called_once_with(
|
||||
test.IsHttpRequest(), floating_ip.id)
|
||||
|
||||
@test.create_mocks({api.neutron: ('tenant_floating_ip_list',
|
||||
'floating_ip_pools_list'),
|
||||
api.nova: ('server_list',),
|
||||
quotas: ('tenant_quota_usages',),
|
||||
api.base: ('is_service_enabled',)})
|
||||
quotas: ('tenant_quota_usages',)})
|
||||
def test_allocate_button_attributes(self):
|
||||
floating_ips = self.floating_ips.list()
|
||||
floating_pools = self.pools.list()
|
||||
quota_data = self.neutron_quota_usages.first()
|
||||
|
||||
api.neutron.tenant_floating_ip_list(
|
||||
IsA(http.HttpRequest)) \
|
||||
.AndReturn(floating_ips)
|
||||
api.neutron.floating_ip_pools_list(
|
||||
IsA(http.HttpRequest)) \
|
||||
.AndReturn(floating_pools)
|
||||
api.nova.server_list(
|
||||
IsA(http.HttpRequest), detailed=False) \
|
||||
.AndReturn([self.servers.list(), False])
|
||||
quotas.tenant_quota_usages(
|
||||
IsA(http.HttpRequest), targets=('floatingip', )).MultipleTimes() \
|
||||
.AndReturn(quota_data)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_tenant_floating_ip_list.return_value = floating_ips
|
||||
self.mock_floating_ip_pools_list.return_value = floating_pools
|
||||
self.mock_server_list.return_value = [self.servers.list(), False]
|
||||
self.mock_tenant_quota_usages.return_value = quota_data
|
||||
|
||||
res = self.client.get(INDEX_URL)
|
||||
|
||||
@ -275,31 +290,30 @@ class FloatingIpViewTests(test.TestCase):
|
||||
url = 'horizon:project:floating_ips:allocate'
|
||||
self.assertEqual(url, allocate_action.url)
|
||||
|
||||
@test.create_stubs({api.neutron: ('tenant_floating_ip_list',
|
||||
'floating_ip_pools_list',),
|
||||
self.mock_tenant_floating_ip_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_pools_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_server_list.assert_called_once_with(test.IsHttpRequest(),
|
||||
detailed=False)
|
||||
self.assert_mock_multiple_calls_with_same_arguments(
|
||||
self.mock_tenant_quota_usages, 3,
|
||||
mock.call(test.IsHttpRequest(), targets=('floatingip', )))
|
||||
|
||||
@test.create_mocks({api.neutron: ('tenant_floating_ip_list',
|
||||
'floating_ip_pools_list'),
|
||||
api.nova: ('server_list',),
|
||||
quotas: ('tenant_quota_usages',),
|
||||
api.base: ('is_service_enabled',)})
|
||||
quotas: ('tenant_quota_usages',)})
|
||||
def test_allocate_button_disabled_when_quota_exceeded(self):
|
||||
floating_ips = self.floating_ips.list()
|
||||
floating_pools = self.pools.list()
|
||||
quota_data = self.neutron_quota_usages.first()
|
||||
quota_data['floatingip']['available'] = 0
|
||||
|
||||
api.neutron.tenant_floating_ip_list(
|
||||
IsA(http.HttpRequest)) \
|
||||
.AndReturn(floating_ips)
|
||||
api.neutron.floating_ip_pools_list(
|
||||
IsA(http.HttpRequest)) \
|
||||
.AndReturn(floating_pools)
|
||||
api.nova.server_list(
|
||||
IsA(http.HttpRequest), detailed=False) \
|
||||
.AndReturn([self.servers.list(), False])
|
||||
quotas.tenant_quota_usages(
|
||||
IsA(http.HttpRequest), targets=('floatingip', )).MultipleTimes() \
|
||||
.AndReturn(quota_data)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_tenant_floating_ip_list.return_value = floating_ips
|
||||
self.mock_floating_ip_pools_list.return_value = floating_pools
|
||||
self.mock_server_list.return_value = [self.servers.list(), False]
|
||||
self.mock_tenant_quota_usages.return_value = quota_data
|
||||
|
||||
res = self.client.get(INDEX_URL)
|
||||
|
||||
@ -310,8 +324,17 @@ class FloatingIpViewTests(test.TestCase):
|
||||
self.assertEqual('Allocate IP To Project (Quota exceeded)',
|
||||
six.text_type(allocate_action.verbose_name))
|
||||
|
||||
@test.create_stubs({api.neutron: ('floating_ip_pools_list',
|
||||
'floating_ip_supported',
|
||||
self.mock_tenant_floating_ip_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_pools_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_server_list.assert_called_once_with(test.IsHttpRequest(),
|
||||
detailed=False)
|
||||
self.assert_mock_multiple_calls_with_same_arguments(
|
||||
self.mock_tenant_quota_usages, 3,
|
||||
mock.call(test.IsHttpRequest(), targets=('floatingip', )))
|
||||
|
||||
@test.create_mocks({api.neutron: ('floating_ip_pools_list',
|
||||
'tenant_floating_ip_list',
|
||||
'is_extension_supported',
|
||||
'is_router_enabled',
|
||||
@ -320,29 +343,38 @@ class FloatingIpViewTests(test.TestCase):
|
||||
api.cinder: ('is_volume_service_enabled',)})
|
||||
@test.update_settings(OPENSTACK_NEUTRON_NETWORK={'enable_quotas': True})
|
||||
def test_correct_quotas_displayed(self):
|
||||
api.cinder.is_volume_service_enabled(IsA(http.HttpRequest)) \
|
||||
.AndReturn(False)
|
||||
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
|
||||
.MultipleTimes().AndReturn(True)
|
||||
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
|
||||
.MultipleTimes().AndReturn(True)
|
||||
api.neutron.is_extension_supported(
|
||||
IsA(http.HttpRequest), 'security-group').AndReturn(True)
|
||||
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'quotas') \
|
||||
.AndReturn(True)
|
||||
api.neutron.is_router_enabled(IsA(http.HttpRequest)) \
|
||||
.AndReturn(True)
|
||||
api.neutron.is_extension_supported(IsA(http.HttpRequest),
|
||||
'quota_details').AndReturn(False)
|
||||
api.neutron.tenant_quota_get(IsA(http.HttpRequest), self.tenant.id) \
|
||||
.AndReturn(self.neutron_quotas.first())
|
||||
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
|
||||
.MultipleTimes().AndReturn(self.floating_ips.list())
|
||||
api.neutron.floating_ip_pools_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.pools.list())
|
||||
self.mox.ReplayAll()
|
||||
self.mock_is_volume_service_enabled.return_value = False
|
||||
self.mock_is_service_enabled.side_effect = [True, True]
|
||||
self.mock_is_extension_supported.side_effect = [True, True, False]
|
||||
self.mock_is_router_enabled.return_value = True
|
||||
self.mock_tenant_quota_get.return_value = self.neutron_quotas.first()
|
||||
self.mock_tenant_floating_ip_list.return_value = \
|
||||
self.floating_ips.list()
|
||||
self.mock_floating_ip_pools_list.return_value = self.pools.list()
|
||||
|
||||
url = reverse('%s:allocate' % NAMESPACE)
|
||||
res = self.client.get(url)
|
||||
self.assertEqual(res.context['usages']['floatingip']['quota'],
|
||||
self.neutron_quotas.first().get('floatingip').limit)
|
||||
|
||||
self.mock_is_volume_service_enabled.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.assertEqual(2, self.mock_is_service_enabled.call_count)
|
||||
self.mock_is_service_enabled.assert_has_calls([
|
||||
mock.call(test.IsHttpRequest(), 'network'),
|
||||
mock.call(test.IsHttpRequest(), 'compute'),
|
||||
])
|
||||
self.assertEqual(3, self.mock_is_extension_supported.call_count)
|
||||
self.mock_is_extension_supported.assert_has_calls([
|
||||
mock.call(test.IsHttpRequest(), 'security-group'),
|
||||
mock.call(test.IsHttpRequest(), 'quotas'),
|
||||
mock.call(test.IsHttpRequest(), 'quota_details'),
|
||||
])
|
||||
self.mock_is_router_enabled.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_tenant_quota_get.assert_called_once_with(
|
||||
test.IsHttpRequest(), self.tenant.id)
|
||||
self.mock_tenant_floating_ip_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_floating_ip_pools_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
@ -16,9 +16,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from django import http
|
||||
from django.urls import reverse
|
||||
from mox3.mox import IsA
|
||||
import mock
|
||||
import six
|
||||
|
||||
from openstack_dashboard import api
|
||||
@ -32,61 +31,63 @@ INDEX_URL = reverse('horizon:project:key_pairs:index')
|
||||
|
||||
|
||||
class KeyPairTests(test.TestCase):
|
||||
@test.create_stubs({
|
||||
api.nova: ('keypair_list',),
|
||||
quotas: ('tenant_quota_usages',),
|
||||
})
|
||||
@test.create_mocks({api.nova: ('keypair_list',),
|
||||
quotas: ('tenant_quota_usages',)})
|
||||
def test_index(self):
|
||||
keypairs = self.keypairs.list()
|
||||
quota_data = self.quota_usages.first()
|
||||
|
||||
quotas.tenant_quota_usages(IsA(http.HttpRequest),
|
||||
targets=('key_pairs', )).MultipleTimes() \
|
||||
.AndReturn(quota_data)
|
||||
api.nova.keypair_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(keypairs)
|
||||
self.mock_tenant_quota_usages.return_value = quota_data
|
||||
self.mock_keypair_list.return_value = keypairs
|
||||
|
||||
self.mox.ReplayAll()
|
||||
res = self.client.get(INDEX_URL)
|
||||
|
||||
self.assertTemplateUsed(res, 'horizon/common/_data_table_view.html')
|
||||
self.assertItemsEqual(res.context['keypairs_table'].data, keypairs)
|
||||
|
||||
@test.create_stubs({api.nova: ('keypair_list', 'keypair_delete')})
|
||||
self.assert_mock_multiple_calls_with_same_arguments(
|
||||
self.mock_tenant_quota_usages, 4,
|
||||
mock.call(test.IsHttpRequest(), targets=('key_pairs', )))
|
||||
self.mock_keypair_list.assert_called_once_with(test.IsHttpRequest())
|
||||
|
||||
@test.create_mocks({api.nova: ('keypair_list',
|
||||
'keypair_delete')})
|
||||
def test_delete_keypair(self):
|
||||
keypair = self.keypairs.first()
|
||||
|
||||
api.nova.keypair_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.keypairs.list())
|
||||
api.nova.keypair_delete(IsA(http.HttpRequest), keypair.name)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_keypair_list.return_value = self.keypairs.list()
|
||||
self.mock_keypair_delete.return_value = None
|
||||
|
||||
formData = {'action': 'keypairs__delete__%s' % keypair.name}
|
||||
res = self.client.post(INDEX_URL, formData)
|
||||
self.assertRedirectsNoFollow(res, INDEX_URL)
|
||||
|
||||
@test.create_stubs({api.nova: ('keypair_list', 'keypair_delete')})
|
||||
self.mock_keypair_list.assert_called_once_with(test.IsHttpRequest())
|
||||
self.mock_keypair_delete.assert_called_once_with(test.IsHttpRequest(),
|
||||
keypair.name)
|
||||
|
||||
@test.create_mocks({api.nova: ('keypair_list',
|
||||
'keypair_delete')})
|
||||
def test_delete_keypair_exception(self):
|
||||
keypair = self.keypairs.first()
|
||||
|
||||
api.nova.keypair_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(self.keypairs.list())
|
||||
api.nova.keypair_delete(IsA(http.HttpRequest), keypair.name) \
|
||||
.AndRaise(self.exceptions.nova)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_keypair_list.return_value = self.keypairs.list()
|
||||
self.mock_keypair_delete.side_effect = self.exceptions.nova
|
||||
|
||||
formData = {'action': 'keypairs__delete__%s' % keypair.name}
|
||||
res = self.client.post(INDEX_URL, formData)
|
||||
self.assertRedirectsNoFollow(res, INDEX_URL)
|
||||
|
||||
@test.create_stubs({api.nova: ('keypair_get',)})
|
||||
self.mock_keypair_list.assert_called_once_with(test.IsHttpRequest())
|
||||
self.mock_keypair_delete.assert_called_once_with(test.IsHttpRequest(),
|
||||
keypair.name)
|
||||
|
||||
@test.create_mocks({api.nova: ('keypair_get',)})
|
||||
def test_keypair_detail_get(self):
|
||||
keypair = self.keypairs.first()
|
||||
keypair.private_key = "secret"
|
||||
|
||||
api.nova.keypair_get(IsA(http.HttpRequest),
|
||||
keypair.name).AndReturn(keypair)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_keypair_get.return_value = keypair
|
||||
|
||||
context = {'keypair_name': keypair.name}
|
||||
url = reverse('horizon:project:key_pairs:detail',
|
||||
@ -94,15 +95,16 @@ class KeyPairTests(test.TestCase):
|
||||
res = self.client.get(url, context)
|
||||
self.assertContains(res, "<dd>%s</dd>" % keypair.name, 1, 200)
|
||||
|
||||
@test.create_stubs({api.nova: ("keypair_import",)})
|
||||
self.mock_keypair_get.assert_called_once_with(test.IsHttpRequest(),
|
||||
keypair.name)
|
||||
|
||||
@test.create_mocks({api.nova: ('keypair_import',)})
|
||||
def test_import_keypair(self):
|
||||
key1_name = "new_key_pair"
|
||||
public_key = "ssh-rsa ABCDEFGHIJKLMNOPQR\r\n" \
|
||||
"STUVWXYZ1234567890\r" \
|
||||
"XXYYZZ user@computer\n\n"
|
||||
api.nova.keypair_import(IsA(http.HttpRequest), key1_name,
|
||||
public_key.replace("\r", "").replace("\n", ""))
|
||||
self.mox.ReplayAll()
|
||||
self.mock_keypair_import.return_value = None
|
||||
|
||||
formData = {'method': 'ImportKeypair',
|
||||
'name': key1_name,
|
||||
@ -111,14 +113,16 @@ class KeyPairTests(test.TestCase):
|
||||
res = self.client.post(url, formData)
|
||||
self.assertMessageCount(res, success=1)
|
||||
|
||||
@test.create_stubs({api.nova: ("keypair_import",)})
|
||||
self.mock_keypair_import.assert_called_once_with(
|
||||
test.IsHttpRequest(), key1_name,
|
||||
public_key.replace("\r", "").replace("\n", ""))
|
||||
|
||||
@test.create_mocks({api.nova: ('keypair_import',)})
|
||||
def test_import_keypair_invalid_key(self):
|
||||
key_name = "new_key_pair"
|
||||
public_key = "ABCDEF"
|
||||
|
||||
api.nova.keypair_import(IsA(http.HttpRequest), key_name, public_key) \
|
||||
.AndRaise(self.exceptions.nova)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_keypair_import.side_effect = self.exceptions.nova
|
||||
|
||||
formData = {'method': 'ImportKeypair',
|
||||
'name': key_name,
|
||||
@ -129,6 +133,9 @@ class KeyPairTests(test.TestCase):
|
||||
msg = 'Unable to import key pair.'
|
||||
self.assertFormErrors(res, count=1, message=msg)
|
||||
|
||||
self.mock_keypair_import.assert_called_once_with(
|
||||
test.IsHttpRequest(), key_name, public_key)
|
||||
|
||||
def test_import_keypair_invalid_key_name(self):
|
||||
key_name = "invalid#key?name=!"
|
||||
public_key = "ABCDEF"
|
||||
@ -155,15 +162,13 @@ class KeyPairTests(test.TestCase):
|
||||
msg = six.text_type(KEYPAIR_ERROR_MESSAGES['invalid'])
|
||||
self.assertFormErrors(res, count=1, message=msg)
|
||||
|
||||
@test.create_stubs({api.nova: ("keypair_import",)})
|
||||
@test.create_mocks({api.nova: ('keypair_import',)})
|
||||
def test_import_keypair_with_regex_defined_name(self):
|
||||
key1_name = "new-key-pair with_regex"
|
||||
public_key = "ssh-rsa ABCDEFGHIJKLMNOPQR\r\n" \
|
||||
"STUVWXYZ1234567890\r" \
|
||||
"XXYYZZ user@computer\n\n"
|
||||
api.nova.keypair_import(IsA(http.HttpRequest), key1_name,
|
||||
public_key.replace("\r", "").replace("\n", ""))
|
||||
self.mox.ReplayAll()
|
||||
self.mock_keypair_import.return_value = None
|
||||
|
||||
formData = {'method': 'ImportKeypair',
|
||||
'name': key1_name,
|
||||
@ -171,3 +176,7 @@ class KeyPairTests(test.TestCase):
|
||||
url = reverse('horizon:project:key_pairs:import')
|
||||
res = self.client.post(url, formData)
|
||||
self.assertMessageCount(res, success=1)
|
||||
|
||||
self.mock_keypair_import.assert_called_once_with(
|
||||
test.IsHttpRequest(), key1_name,
|
||||
public_key.replace("\r", "").replace("\n", ""))
|
||||
|
@ -12,16 +12,14 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from django import http
|
||||
import django.test
|
||||
from django.urls import reverse
|
||||
|
||||
from mox3.mox import IsA
|
||||
import mock
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from openstack_dashboard import api
|
||||
from openstack_dashboard.dashboards.project.network_topology.views import \
|
||||
TranslationHelper
|
||||
from openstack_dashboard.dashboards.project.network_topology import views
|
||||
from openstack_dashboard.test import helpers as test
|
||||
from openstack_dashboard.usage import quotas
|
||||
|
||||
@ -30,9 +28,9 @@ INDEX_URL = reverse('horizon:project:network_topology:index')
|
||||
|
||||
|
||||
class NetworkTopologyTests(test.TestCase):
|
||||
trans = TranslationHelper()
|
||||
trans = views.TranslationHelper()
|
||||
|
||||
@test.create_stubs({api.nova: ('server_list',),
|
||||
@test.create_mocks({api.nova: ('server_list',),
|
||||
api.neutron: ('network_list_for_tenant',
|
||||
'network_list',
|
||||
'router_list',
|
||||
@ -42,14 +40,14 @@ class NetworkTopologyTests(test.TestCase):
|
||||
|
||||
@django.test.utils.override_settings(
|
||||
OPENSTACK_NEUTRON_NETWORK={'enable_router': False})
|
||||
@test.create_stubs({api.nova: ('server_list',),
|
||||
@test.create_mocks({api.nova: ('server_list',),
|
||||
api.neutron: ('network_list_for_tenant',
|
||||
'port_list')})
|
||||
def test_json_view_router_disabled(self):
|
||||
self._test_json_view(router_enable=False)
|
||||
|
||||
@django.test.utils.override_settings(CONSOLE_TYPE=None)
|
||||
@test.create_stubs({api.nova: ('server_list',),
|
||||
@test.create_mocks({api.nova: ('server_list',),
|
||||
api.neutron: ('network_list_for_tenant',
|
||||
'network_list',
|
||||
'router_list',
|
||||
@ -58,33 +56,22 @@ class NetworkTopologyTests(test.TestCase):
|
||||
self._test_json_view(with_console=False)
|
||||
|
||||
def _test_json_view(self, router_enable=True, with_console=True):
|
||||
api.nova.server_list(
|
||||
IsA(http.HttpRequest)).AndReturn([self.servers.list(), False])
|
||||
self.mock_server_list.return_value = [self.servers.list(), False]
|
||||
|
||||
tenant_networks = [net for net in self.networks.list()
|
||||
if not net['router:external']]
|
||||
external_networks = [net for net in self.networks.list()
|
||||
if net['router:external']]
|
||||
api.neutron.network_list_for_tenant(
|
||||
IsA(http.HttpRequest),
|
||||
self.tenant.id).AndReturn(tenant_networks)
|
||||
self.mock_network_list_for_tenant.return_value = tenant_networks
|
||||
|
||||
# router1 : gateway port not in the port list
|
||||
# router2 : no gateway port
|
||||
# router3 : gateway port included in port list
|
||||
routers = self.routers.list() + self.routers_with_rules.list()
|
||||
if router_enable:
|
||||
api.neutron.router_list(
|
||||
IsA(http.HttpRequest),
|
||||
tenant_id=self.tenant.id).AndReturn(routers)
|
||||
api.neutron.port_list(
|
||||
IsA(http.HttpRequest)).AndReturn(self.ports.list())
|
||||
if router_enable:
|
||||
api.neutron.network_list(
|
||||
IsA(http.HttpRequest),
|
||||
**{'router:external': True}).AndReturn(external_networks)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_router_list.return_value = routers
|
||||
self.mock_network_list.return_value = external_networks
|
||||
self.mock_port_list.return_value = self.ports.list()
|
||||
|
||||
res = self.client.get(JSON_URL)
|
||||
|
||||
@ -92,8 +79,6 @@ class NetworkTopologyTests(test.TestCase):
|
||||
data = jsonutils.loads(res.content)
|
||||
|
||||
# servers
|
||||
# result_server_urls = [(server['id'], server['url'])
|
||||
# for server in data['servers']]
|
||||
expect_server_urls = []
|
||||
for server in self.servers.list():
|
||||
expect_server = {
|
||||
@ -110,8 +95,6 @@ class NetworkTopologyTests(test.TestCase):
|
||||
self.assertEqual(expect_server_urls, data['servers'])
|
||||
|
||||
# routers
|
||||
# result_router_urls = [(router['id'], router['url'])
|
||||
# for router in data['routers']]
|
||||
if router_enable:
|
||||
expect_router_urls = [
|
||||
{'id': router.id,
|
||||
@ -187,28 +170,30 @@ class NetworkTopologyTests(test.TestCase):
|
||||
'fixed_ips': []})
|
||||
self.assertEqual(expect_port_urls, data['ports'])
|
||||
|
||||
self.mock_server_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.mock_network_list_for_tenant.assert_called_once_with(
|
||||
test.IsHttpRequest(), self.tenant.id)
|
||||
if router_enable:
|
||||
self.mock_router_list.assert_called_once_with(
|
||||
test.IsHttpRequest(), tenant_id=self.tenant.id)
|
||||
self.mock_network_list.assert_called_once_with(
|
||||
test.IsHttpRequest(), **{'router:external': True})
|
||||
self.mock_port_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
|
||||
class NetworkTopologyCreateTests(test.TestCase):
|
||||
|
||||
def _test_new_button_disabled_when_quota_exceeded(
|
||||
self, expected_string, networks_quota=10,
|
||||
routers_quota=10, instances_quota=10):
|
||||
self, expected_string,
|
||||
networks_quota=10, routers_quota=10, instances_quota=10):
|
||||
quota_data = self.quota_usages.first()
|
||||
quota_data['network']['available'] = networks_quota
|
||||
quota_data['router']['available'] = routers_quota
|
||||
quota_data['instances']['available'] = instances_quota
|
||||
|
||||
quotas.tenant_quota_usages(
|
||||
IsA(http.HttpRequest), targets=('instances', )
|
||||
).MultipleTimes().AndReturn(quota_data)
|
||||
quotas.tenant_quota_usages(
|
||||
IsA(http.HttpRequest), targets=('network', )
|
||||
).MultipleTimes().AndReturn(quota_data)
|
||||
quotas.tenant_quota_usages(
|
||||
IsA(http.HttpRequest), targets=('router', )
|
||||
).MultipleTimes().AndReturn(quota_data)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_tenant_quota_usages.return_value = quota_data
|
||||
|
||||
res = self.client.get(INDEX_URL)
|
||||
self.assertTemplateUsed(res, 'project/network_topology/index.html')
|
||||
@ -216,7 +201,13 @@ class NetworkTopologyCreateTests(test.TestCase):
|
||||
self.assertContains(res, expected_string, html=True,
|
||||
msg_prefix="The create button is not disabled")
|
||||
|
||||
@test.create_stubs({quotas: ('tenant_quota_usages',)})
|
||||
self.mock_tenant_quota_usages.assert_has_calls([
|
||||
mock.call(test.IsHttpRequest(), targets=('instances', )),
|
||||
mock.call(test.IsHttpRequest(), targets=('network', )),
|
||||
mock.call(test.IsHttpRequest(), targets=('router', )),
|
||||
] * 3)
|
||||
|
||||
@test.create_mocks({quotas: ('tenant_quota_usages',)})
|
||||
def test_create_network_button_disabled_when_quota_exceeded(self):
|
||||
url = reverse('horizon:project:network_topology:createnetwork')
|
||||
classes = 'btn btn-default ajax-modal'
|
||||
@ -226,10 +217,10 @@ class NetworkTopologyCreateTests(test.TestCase):
|
||||
"<span class='fa fa-plus'></span>%s</a>" \
|
||||
% (url, classes, link_name)
|
||||
|
||||
self._test_new_button_disabled_when_quota_exceeded(
|
||||
expected_string, networks_quota=0)
|
||||
self._test_new_button_disabled_when_quota_exceeded(expected_string,
|
||||
networks_quota=0)
|
||||
|
||||
@test.create_stubs({quotas: ('tenant_quota_usages',)})
|
||||
@test.create_mocks({quotas: ('tenant_quota_usages',)})
|
||||
def test_create_router_button_disabled_when_quota_exceeded(self):
|
||||
url = reverse('horizon:project:network_topology:createrouter')
|
||||
classes = 'btn btn-default ajax-modal'
|
||||
@ -239,11 +230,11 @@ class NetworkTopologyCreateTests(test.TestCase):
|
||||
"<span class='fa fa-plus'></span>%s</a>" \
|
||||
% (url, classes, link_name)
|
||||
|
||||
self._test_new_button_disabled_when_quota_exceeded(
|
||||
expected_string, routers_quota=0)
|
||||
self._test_new_button_disabled_when_quota_exceeded(expected_string,
|
||||
routers_quota=0)
|
||||
|
||||
@test.update_settings(LAUNCH_INSTANCE_LEGACY_ENABLED=True)
|
||||
@test.create_stubs({quotas: ('tenant_quota_usages',)})
|
||||
@test.create_mocks({quotas: ('tenant_quota_usages',)})
|
||||
def test_launch_instance_button_disabled_when_quota_exceeded(self):
|
||||
url = reverse('horizon:project:network_topology:launchinstance')
|
||||
classes = 'btn btn-default btn-launch ajax-modal'
|
||||
@ -253,5 +244,5 @@ class NetworkTopologyCreateTests(test.TestCase):
|
||||
"<span class='fa fa-cloud-upload'></span>%s</a>" \
|
||||
% (url, classes, link_name)
|
||||
|
||||
self._test_new_button_disabled_when_quota_exceeded(
|
||||
expected_string, instances_quota=0)
|
||||
self._test_new_button_disabled_when_quota_exceeded(expected_string,
|
||||
instances_quota=0)
|
||||
|
@ -18,11 +18,10 @@
|
||||
|
||||
import cgi
|
||||
|
||||
from mox3.mox import IsA
|
||||
import mock
|
||||
import six
|
||||
|
||||
from django.conf import settings
|
||||
from django import http
|
||||
from django.urls import reverse
|
||||
|
||||
from horizon import exceptions
|
||||
@ -63,21 +62,15 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
self.edit_url = reverse(SG_ADD_RULE_VIEW, args=[sec_group.id])
|
||||
self.update_url = reverse(SG_UPDATE_VIEW, args=[sec_group.id])
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_list',),
|
||||
@test.create_mocks({api.neutron: ('security_group_list',),
|
||||
quotas: ('tenant_quota_usages',)})
|
||||
def test_index(self):
|
||||
sec_groups = self.security_groups.list()
|
||||
quota_data = self.neutron_quota_usages.first()
|
||||
quota_data['security_group']['available'] = 10
|
||||
|
||||
api.neutron.security_group_list(IsA(http.HttpRequest)) \
|
||||
.AndReturn(sec_groups)
|
||||
quotas.tenant_quota_usages(
|
||||
IsA(http.HttpRequest),
|
||||
targets=('security_group', )).MultipleTimes() \
|
||||
.AndReturn(quota_data)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_list.return_value = sec_groups
|
||||
self.mock_tenant_quota_usages.return_value = quota_data
|
||||
|
||||
res = self.client.get(INDEX_URL)
|
||||
|
||||
@ -96,22 +89,21 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
all([sec_groups_from_ctx[i].name <= sec_groups_from_ctx[i + 1].name
|
||||
for i in range(len(sec_groups_from_ctx) - 1)]))
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_list',),
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.assert_mock_multiple_calls_with_same_arguments(
|
||||
self.mock_tenant_quota_usages, 2,
|
||||
mock.call(test.IsHttpRequest(), targets=('security_group', )))
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_list',),
|
||||
quotas: ('tenant_quota_usages',)})
|
||||
def test_create_button_attributes(self):
|
||||
sec_groups = self.security_groups.list()
|
||||
quota_data = self.neutron_quota_usages.first()
|
||||
quota_data['security_group']['available'] = 10
|
||||
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)) \
|
||||
.AndReturn(sec_groups)
|
||||
quotas.tenant_quota_usages(
|
||||
IsA(http.HttpRequest),
|
||||
targets=('security_group', )).MultipleTimes() \
|
||||
.AndReturn(quota_data)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_list.return_value = sec_groups
|
||||
self.mock_tenant_quota_usages.return_value = quota_data
|
||||
|
||||
res = self.client.get(INDEX_URL)
|
||||
|
||||
@ -129,7 +121,13 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
url = 'horizon:project:security_groups:create'
|
||||
self.assertEqual(url, create_action.url)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_list',),
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.assert_mock_multiple_calls_with_same_arguments(
|
||||
self.mock_tenant_quota_usages, 3,
|
||||
mock.call(test.IsHttpRequest(), targets=('security_group', )))
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_list',),
|
||||
quotas: ('tenant_quota_usages',)})
|
||||
def _test_create_button_disabled_when_quota_exceeded(self,
|
||||
network_enabled):
|
||||
@ -137,15 +135,8 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
quota_data = self.neutron_quota_usages.first()
|
||||
quota_data['security_group']['available'] = 0
|
||||
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)) \
|
||||
.AndReturn(sec_groups)
|
||||
quotas.tenant_quota_usages(
|
||||
IsA(http.HttpRequest),
|
||||
targets=('security_group', )).MultipleTimes() \
|
||||
.AndReturn(quota_data)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_list.return_value = sec_groups
|
||||
self.mock_tenant_quota_usages.return_value = quota_data
|
||||
|
||||
res = self.client.get(INDEX_URL)
|
||||
|
||||
@ -157,21 +148,34 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
self.assertIn('disabled', create_action.classes,
|
||||
'The create button should be disabled')
|
||||
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
self.assert_mock_multiple_calls_with_same_arguments(
|
||||
self.mock_tenant_quota_usages, 3,
|
||||
mock.call(test.IsHttpRequest(), targets=('security_group', )))
|
||||
|
||||
def test_create_button_disabled_when_quota_exceeded_neutron_disabled(self):
|
||||
self._test_create_button_disabled_when_quota_exceeded(False)
|
||||
|
||||
def test_create_button_disabled_when_quota_exceeded_neutron_enabled(self):
|
||||
self._test_create_button_disabled_when_quota_exceeded(True)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def _add_security_group_rule_fixture(self, **kwargs):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
api.neutron.security_group_rule_create(
|
||||
IsA(http.HttpRequest),
|
||||
self.mock_security_group_rule_create.return_value = rule
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
return sec_group, rule
|
||||
|
||||
def _check_add_security_group_rule(self, **kwargs):
|
||||
sec_group = self.security_groups.first()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
kwargs.get('sec_group', sec_group.id),
|
||||
kwargs.get('ingress', 'ingress'),
|
||||
kwargs.get('ethertype', 'IPv4'),
|
||||
@ -179,23 +183,22 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
kwargs.get('from_port', int(rule.from_port)),
|
||||
kwargs.get('to_port', int(rule.to_port)),
|
||||
kwargs.get('cidr', rule.ip_range['cidr']),
|
||||
kwargs.get('security_group', u'%s' % sec_group.id)).AndReturn(rule)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
return sec_group, rule
|
||||
kwargs.get('security_group', u'%s' % sec_group.id))
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_get',)})
|
||||
@test.create_mocks({api.neutron: ('security_group_get',)})
|
||||
def test_update_security_groups_get(self):
|
||||
sec_group = self.security_groups.first()
|
||||
api.neutron.security_group_get(IsA(http.HttpRequest),
|
||||
sec_group.id).AndReturn(sec_group)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_get.return_value = sec_group
|
||||
res = self.client.get(self.update_url)
|
||||
self.assertTemplateUsed(res, SG_UPDATE_TEMPLATE)
|
||||
self.assertEqual(res.context['security_group'].name,
|
||||
sec_group.name)
|
||||
self.mock_security_group_get.assert_called_once_with(
|
||||
test.IsHttpRequest(), sec_group.id)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_update',
|
||||
@test.create_mocks({api.neutron: ('security_group_update',
|
||||
'security_group_get')})
|
||||
def test_update_security_groups_post(self):
|
||||
"""Ensure that we can change a group name.
|
||||
@ -206,28 +209,28 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
"""
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group.name = "@new name"
|
||||
api.neutron.security_group_update(
|
||||
IsA(http.HttpRequest),
|
||||
str(sec_group.id),
|
||||
sec_group.name,
|
||||
sec_group.description).AndReturn(sec_group)
|
||||
api.neutron.security_group_get(
|
||||
IsA(http.HttpRequest), sec_group.id).AndReturn(sec_group)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_update.return_value = sec_group
|
||||
self.mock_security_group_get.return_value = sec_group
|
||||
form_data = {'method': 'UpdateGroup',
|
||||
'id': sec_group.id,
|
||||
'name': sec_group.name,
|
||||
'description': sec_group.description}
|
||||
res = self.client.post(self.update_url, form_data)
|
||||
self.assertRedirectsNoFollow(res, INDEX_URL)
|
||||
self.mock_security_group_update.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
str(sec_group.id),
|
||||
sec_group.name,
|
||||
sec_group.description)
|
||||
self.mock_security_group_get.assert_called_once_with(
|
||||
test.IsHttpRequest(), sec_group.id)
|
||||
|
||||
def test_create_security_groups_get(self):
|
||||
res = self.client.get(SG_CREATE_URL)
|
||||
self.assertTemplateUsed(res, SG_CREATE_TEMPLATE)
|
||||
|
||||
def test_create_security_groups_post(self):
|
||||
sec_group = self.security_groups.first()
|
||||
self._create_security_group(sec_group)
|
||||
self._create_security_group()
|
||||
|
||||
def test_create_security_groups_special_chars(self):
|
||||
"""Ensure non-alphanumeric characters can be used as a group name.
|
||||
@ -235,32 +238,30 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
bug #1233501 Security group names cannot contain at characters
|
||||
bug #1224576 Security group names cannot contain spaces
|
||||
"""
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group.name = '@group name-\xe3\x82\xb3'
|
||||
self._create_security_group(sec_group)
|
||||
sg_name = b'@group name-\xe3\x82\xb3'.decode('utf8')
|
||||
self._create_security_group(sg_name=sg_name)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_create',)})
|
||||
def _create_security_group(self, sec_group):
|
||||
api.neutron.security_group_create(
|
||||
IsA(http.HttpRequest),
|
||||
sec_group.name,
|
||||
sec_group.description).AndReturn(sec_group)
|
||||
self.mox.ReplayAll()
|
||||
@test.create_mocks({api.neutron: ('security_group_create',)})
|
||||
def _create_security_group(self, sg_name=None):
|
||||
sec_group = self.security_groups.first()
|
||||
if sg_name is not None:
|
||||
sec_group.name = sg_name
|
||||
self.mock_security_group_create.return_value = sec_group
|
||||
|
||||
form_data = {'method': 'CreateGroup',
|
||||
'name': sec_group.name,
|
||||
'description': sec_group.description}
|
||||
res = self.client.post(SG_CREATE_URL, form_data)
|
||||
self.assertRedirectsNoFollow(res, INDEX_URL)
|
||||
self.mock_security_group_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.name,
|
||||
sec_group.description)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_create',)})
|
||||
@test.create_mocks({api.neutron: ('security_group_create',)})
|
||||
def test_create_security_groups_post_exception(self):
|
||||
sec_group = self.security_groups.first()
|
||||
api.neutron.security_group_create(
|
||||
IsA(http.HttpRequest),
|
||||
sec_group.name,
|
||||
sec_group.description).AndRaise(self.exceptions.nova)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_create.side_effect = self.exceptions.nova
|
||||
|
||||
formData = {'method': 'CreateGroup',
|
||||
'name': sec_group.name,
|
||||
@ -268,33 +269,34 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(SG_CREATE_URL, formData)
|
||||
self.assertMessageCount(error=1)
|
||||
self.assertRedirectsNoFollow(res, INDEX_URL)
|
||||
self.mock_security_group_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.name,
|
||||
sec_group.description)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_get',)})
|
||||
@test.create_mocks({api.neutron: ('security_group_get',)})
|
||||
def test_detail_get(self):
|
||||
sec_group = self.security_groups.first()
|
||||
|
||||
api.neutron.security_group_get(IsA(http.HttpRequest),
|
||||
sec_group.id).AndReturn(sec_group)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_get.return_value = sec_group
|
||||
res = self.client.get(self.detail_url)
|
||||
self.assertTemplateUsed(res, SG_DETAIL_TEMPLATE)
|
||||
self.mock_security_group_get.assert_called_once_with(
|
||||
test.IsHttpRequest(), sec_group.id)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_get',)})
|
||||
@test.create_mocks({api.neutron: ('security_group_get',)})
|
||||
def test_detail_get_exception(self):
|
||||
sec_group = self.security_groups.first()
|
||||
|
||||
api.neutron.security_group_get(
|
||||
IsA(http.HttpRequest),
|
||||
sec_group.id).AndRaise(self.exceptions.nova)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.mock_security_group_get.side_effect = self.exceptions.nova
|
||||
res = self.client.get(self.detail_url)
|
||||
self.assertRedirectsNoFollow(res, INDEX_URL)
|
||||
self.mock_security_group_get.assert_called_once_with(
|
||||
test.IsHttpRequest(), sec_group.id)
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_cidr(self):
|
||||
sec_group, rule = self._add_security_group_rule_fixture(
|
||||
security_group=None)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -305,11 +307,14 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
'remote': 'cidr'}
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
self._check_add_security_group_rule(
|
||||
security_group=None)
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_cidr_with_invalid_unused_fields(self):
|
||||
sec_group, rule = self._add_security_group_rule_fixture(
|
||||
security_group=None)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -327,11 +332,14 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertNoFormErrors(res)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
self._check_add_security_group_rule(
|
||||
security_group=None)
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_securitygroup_with_invalid_unused_fields(self):
|
||||
sec_group, rule = self._add_security_group_rule_fixture(
|
||||
cidr=None, ethertype='')
|
||||
self.mox.ReplayAll()
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -349,11 +357,14 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertNoFormErrors(res)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
self._check_add_security_group_rule(
|
||||
cidr=None, ethertype='')
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_icmp_with_invalid_unused_fields(self):
|
||||
sec_group, rule = self._add_security_group_rule_fixture(
|
||||
ip_protocol='icmp', security_group=None)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -371,25 +382,18 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertNoFormErrors(res)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
self._check_add_security_group_rule(
|
||||
ip_protocol='icmp', security_group=None)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_cidr_with_template(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
|
||||
sec_group.id,
|
||||
'ingress', 'IPv4',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
rule.ip_range['cidr'],
|
||||
None).AndReturn(rule)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_create.return_value = rule
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -400,33 +404,33 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.id,
|
||||
'ingress', 'IPv4',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
rule.ip_range['cidr'],
|
||||
None)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
def _get_source_group_rule(self):
|
||||
for rule in self.security_group_rules.list():
|
||||
if rule.group:
|
||||
return rule
|
||||
raise Exception("No matches found.")
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list',)})
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_self_as_source_group(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self._get_source_group_rule()
|
||||
|
||||
api.neutron.security_group_rule_create(
|
||||
IsA(http.HttpRequest),
|
||||
sec_group.id,
|
||||
'ingress',
|
||||
# ethertype is empty for source_group of Nova Security Group
|
||||
'',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
None,
|
||||
u'%s' % sec_group.id).AndReturn(rule)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_create.return_value = rule
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -439,15 +443,8 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list',)})
|
||||
def test_detail_add_rule_self_as_source_group_with_template(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self._get_source_group_rule()
|
||||
|
||||
api.neutron.security_group_rule_create(
|
||||
IsA(http.HttpRequest),
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.id,
|
||||
'ingress',
|
||||
# ethertype is empty for source_group of Nova Security Group
|
||||
@ -456,10 +453,19 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
None,
|
||||
u'%s' % sec_group.id).AndReturn(rule)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
self.mox.ReplayAll()
|
||||
u'%s' % sec_group.id)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_self_as_source_group_with_template(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self._get_source_group_rule()
|
||||
|
||||
self.mock_security_group_rule_create.return_value = rule
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -471,17 +477,27 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_list',)})
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.id,
|
||||
'ingress',
|
||||
# ethertype is empty for source_group of Nova Security Group
|
||||
'',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
None,
|
||||
u'%s' % sec_group.id)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_list',)})
|
||||
def test_detail_invalid_port(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
for i in range(2):
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -494,17 +510,17 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
self.assertNoMessages()
|
||||
self.assertContains(res, "The specified port is invalid")
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_list',)})
|
||||
self.assert_mock_multiple_calls_with_same_arguments(
|
||||
self.mock_security_group_list, 2,
|
||||
mock.call(test.IsHttpRequest()))
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_list',)})
|
||||
def test_detail_invalid_port_range(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
for i in range(6):
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -544,20 +560,17 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
self.assertContains(res, cgi.escape('"to" port number is invalid',
|
||||
quote=True))
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_get',
|
||||
'security_group_list')})
|
||||
self.assert_mock_multiple_calls_with_same_arguments(
|
||||
self.mock_security_group_list, 6,
|
||||
mock.call(test.IsHttpRequest()))
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_list',)})
|
||||
def test_detail_invalid_icmp_rule(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
icmp_rule = self.security_group_rules.list()[1]
|
||||
|
||||
# Call POST 5 times (*2 if Django >= 1.9)
|
||||
call_post = 5 * 2
|
||||
for i in range(call_post):
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -620,24 +633,19 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
self.assertContains(
|
||||
res, "ICMP code is provided but ICMP type is missing.")
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
self.assert_mock_multiple_calls_with_same_arguments(
|
||||
self.mock_security_group_list, 10,
|
||||
mock.call(test.IsHttpRequest()))
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_exception(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
api.neutron.security_group_rule_create(
|
||||
IsA(http.HttpRequest),
|
||||
sec_group.id, 'ingress', 'IPv4',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
rule.ip_range['cidr'],
|
||||
None).AndRaise(self.exceptions.nova)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_create.side_effect = self.exceptions.nova
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -649,24 +657,26 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.id, 'ingress', 'IPv4',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
rule.ip_range['cidr'],
|
||||
None)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_duplicated(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
api.neutron.security_group_rule_create(
|
||||
IsA(http.HttpRequest),
|
||||
sec_group.id, 'ingress', 'IPv4',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
rule.ip_range['cidr'],
|
||||
None).AndRaise(exceptions.Conflict)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_create.side_effect = exceptions.Conflict
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -679,13 +689,22 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
self.assertNoFormErrors(res)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_delete',)})
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.id, 'ingress', 'IPv4',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
rule.ip_range['cidr'],
|
||||
None)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_delete',)})
|
||||
def test_detail_delete_rule(self):
|
||||
sec_group = self.security_groups.first()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
api.neutron.security_group_rule_delete(IsA(http.HttpRequest), rule.id)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_delete.return_value = None
|
||||
|
||||
form_data = {"action": "rules__delete__%s" % rule.id}
|
||||
req = self.factory.post(self.edit_url, form_data)
|
||||
@ -694,16 +713,14 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
handled = table.maybe_handle()
|
||||
self.assertEqual(strip_absolute_base(handled['location']),
|
||||
self.detail_url)
|
||||
self.mock_security_group_rule_delete.assert_called_once_with(
|
||||
test.IsHttpRequest(), rule.id)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_delete',)})
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_delete',)})
|
||||
def test_detail_delete_rule_exception(self):
|
||||
sec_group = self.security_groups.first()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
api.neutron.security_group_rule_delete(
|
||||
IsA(http.HttpRequest),
|
||||
rule.id).AndRaise(self.exceptions.nova)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_delete.side_effect = self.exceptions.nova
|
||||
|
||||
form_data = {"action": "rules__delete__%s" % rule.id}
|
||||
req = self.factory.post(self.edit_url, form_data)
|
||||
@ -713,13 +730,13 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
handled = table.maybe_handle()
|
||||
self.assertEqual(strip_absolute_base(handled['location']),
|
||||
self.detail_url)
|
||||
self.mock_security_group_rule_delete.assert_called_once_with(
|
||||
test.IsHttpRequest(), rule.id)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_delete',)})
|
||||
@test.create_mocks({api.neutron: ('security_group_delete',)})
|
||||
def test_delete_group(self):
|
||||
sec_group = self.security_groups.get(name="other_group")
|
||||
|
||||
api.neutron.security_group_delete(IsA(http.HttpRequest), sec_group.id)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_delete.return_value = None
|
||||
|
||||
form_data = {"action": "security_groups__delete__%s" % sec_group.id}
|
||||
req = self.factory.post(INDEX_URL, form_data)
|
||||
@ -727,16 +744,13 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
handled = table.maybe_handle()
|
||||
self.assertEqual(strip_absolute_base(handled['location']),
|
||||
INDEX_URL)
|
||||
self.mock_security_group_delete.assert_called_once_with(
|
||||
test.IsHttpRequest(), sec_group.id)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_delete',)})
|
||||
@test.create_mocks({api.neutron: ('security_group_delete',)})
|
||||
def test_delete_group_exception(self):
|
||||
sec_group = self.security_groups.get(name="other_group")
|
||||
|
||||
api.neutron.security_group_delete(
|
||||
IsA(http.HttpRequest),
|
||||
sec_group.id).AndRaise(self.exceptions.nova)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_delete.side_effect = self.exceptions.nova
|
||||
|
||||
form_data = {"action": "security_groups__delete__%s" % sec_group.id}
|
||||
req = self.factory.post(INDEX_URL, form_data)
|
||||
@ -745,21 +759,18 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
|
||||
self.assertEqual(strip_absolute_base(handled['location']),
|
||||
INDEX_URL)
|
||||
self.mock_security_group_delete.assert_called_once_with(
|
||||
test.IsHttpRequest(), sec_group.id)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_custom_protocol(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
|
||||
sec_group.id, 'ingress', 'IPv6',
|
||||
37, None, None, 'fe80::/48',
|
||||
None).AndReturn(rule)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_create.return_value = rule
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -772,20 +783,23 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.id, 'ingress', 'IPv6',
|
||||
37, None, None, 'fe80::/48',
|
||||
None)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_egress(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
|
||||
sec_group.id, 'egress', 'IPv4',
|
||||
'udp', 80, 80, '10.1.1.0/24',
|
||||
None).AndReturn(rule)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_create.return_value = rule
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -798,23 +812,23 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.id, 'egress', 'IPv4',
|
||||
'udp', 80, 80, '10.1.1.0/24',
|
||||
None)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_egress_with_all_tcp(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.list()[3]
|
||||
|
||||
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
|
||||
sec_group.id, 'egress', 'IPv4',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
rule.ip_range['cidr'],
|
||||
None).AndReturn(rule)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_create.return_value = rule
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -826,27 +840,26 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.id, 'egress', 'IPv4',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
rule.ip_range['cidr'],
|
||||
None)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_source_group_with_direction_ethertype(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self._get_source_group_rule()
|
||||
|
||||
api.neutron.security_group_rule_create(
|
||||
IsA(http.HttpRequest),
|
||||
sec_group.id,
|
||||
'egress',
|
||||
# ethertype is empty for source_group of Nova Security Group
|
||||
'IPv6',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
None,
|
||||
u'%s' % sec_group.id).AndReturn(rule)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_create.return_value = rule
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -861,13 +874,26 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.id,
|
||||
'egress',
|
||||
# ethertype is empty for source_group of Nova Security Group
|
||||
'IPv6',
|
||||
rule.ip_protocol,
|
||||
int(rule.from_port),
|
||||
int(rule.to_port),
|
||||
None,
|
||||
u'%s' % sec_group.id)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.update_settings(
|
||||
OPENSTACK_NEUTRON_NETWORK={'enable_ipv6': False})
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
@test.create_mocks({api.neutron: ('security_group_list',)})
|
||||
def test_add_rule_ethertype_with_ipv6_disabled(self):
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_list.return_value = \
|
||||
self.security_groups.list()
|
||||
|
||||
res = self.client.get(self.edit_url)
|
||||
|
||||
@ -883,14 +909,16 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res.context['form']['ethertype'].field.initial,
|
||||
'IPv4'
|
||||
)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.update_settings(
|
||||
OPENSTACK_NEUTRON_NETWORK={'enable_ipv6': False})
|
||||
@test.create_stubs({api.neutron: ('security_group_list',)})
|
||||
@test.create_mocks({api.neutron: ('security_group_list',)})
|
||||
def test_add_rule_cidr_with_ipv6_disabled(self):
|
||||
sec_group = self.security_groups.first()
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_list.return_value = \
|
||||
self.security_groups.list()
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -904,18 +932,17 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertFormError(res, 'form', 'cidr',
|
||||
'Invalid version for IP address')
|
||||
self.assert_mock_multiple_calls_with_same_arguments(
|
||||
self.mock_security_group_list, 2,
|
||||
mock.call(test.IsHttpRequest()))
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_list',)})
|
||||
@test.create_mocks({api.neutron: ('security_group_list',)})
|
||||
def test_detail_add_rule_invalid_port(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.first()
|
||||
|
||||
for i in range(2):
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'method': 'AddRule',
|
||||
'id': sec_group.id,
|
||||
@ -928,23 +955,19 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
self.assertNoMessages()
|
||||
self.assertContains(res, "Not a valid port number")
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
self.assert_mock_multiple_calls_with_same_arguments(
|
||||
self.mock_security_group_list, 2,
|
||||
mock.call(test.IsHttpRequest()))
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_ingress_tcp_without_port(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.list()[3]
|
||||
|
||||
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
|
||||
sec_group.id, 'ingress', 'IPv4',
|
||||
'tcp',
|
||||
None,
|
||||
None,
|
||||
rule.ip_range['cidr'],
|
||||
None).AndReturn(rule)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_create.return_value = rule
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'id': sec_group.id,
|
||||
'direction': 'ingress',
|
||||
@ -955,23 +978,26 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
|
||||
@test.create_stubs({api.neutron: ('security_group_rule_create',
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.id, 'ingress', 'IPv4',
|
||||
'tcp',
|
||||
None,
|
||||
None,
|
||||
rule.ip_range['cidr'],
|
||||
None)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
||||
@test.create_mocks({api.neutron: ('security_group_rule_create',
|
||||
'security_group_list')})
|
||||
def test_detail_add_rule_custom_without_protocol(self):
|
||||
sec_group = self.security_groups.first()
|
||||
sec_group_list = self.security_groups.list()
|
||||
rule = self.security_group_rules.list()[3]
|
||||
|
||||
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
|
||||
sec_group.id, 'ingress', 'IPv4',
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
rule.ip_range['cidr'],
|
||||
None).AndReturn(rule)
|
||||
api.neutron.security_group_list(
|
||||
IsA(http.HttpRequest)).AndReturn(sec_group_list)
|
||||
self.mox.ReplayAll()
|
||||
self.mock_security_group_rule_create.return_value = rule
|
||||
self.mock_security_group_list.return_value = sec_group_list
|
||||
|
||||
formData = {'id': sec_group.id,
|
||||
'direction': 'ingress',
|
||||
@ -982,3 +1008,14 @@ class SecurityGroupsViewTests(test.TestCase):
|
||||
'remote': 'cidr'}
|
||||
res = self.client.post(self.edit_url, formData)
|
||||
self.assertRedirectsNoFollow(res, self.detail_url)
|
||||
|
||||
self.mock_security_group_rule_create.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
sec_group.id, 'ingress', 'IPv4',
|
||||
-1,
|
||||
None,
|
||||
None,
|
||||
rule.ip_range['cidr'],
|
||||
None)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest())
|
||||
|
Loading…
Reference in New Issue
Block a user