Drop novalclient security group and floating IP dependency

novaclient 8.0.0 dropped python bindings for security group and
floating IP. This commit drops security group and floating IP logics
from the nova API wrapper.

The following changes are made accordingly.

* Update unit tests to consume neutron test data
* Drop API unit tests for nova security group and floating IP

Partially implement blueprint drop-nova-network
Change-Id: I946c508d7a82162fc8434213e006513867b79350
This commit is contained in:
Akihiro Motoki 2017-04-16 06:27:13 +00:00
parent f23c84189c
commit acd3f2a240
12 changed files with 161 additions and 694 deletions

View File

@ -21,25 +21,24 @@ different dashboard implementations.
from openstack_dashboard.api import base
from openstack_dashboard.api import neutron
from openstack_dashboard.api import nova
class NetworkClient(object):
def __init__(self, request):
# TODO(amotoki): neutron check needs to be dropped.
# The network API wrapper can depend on neutron.
neutron_enabled = base.is_service_enabled(request, 'network')
nova_enabled = base.is_service_enabled(request, 'compute')
self.secgroups, self.floating_ips = None, None
if neutron_enabled:
self.floating_ips = neutron.FloatingIpManager(request)
elif nova_enabled:
self.floating_ips = nova.FloatingIpManager(request)
else:
self.floating_ips = None
if (neutron_enabled and
neutron.is_extension_supported(request, 'security-group')):
self.secgroups = neutron.SecurityGroupManager(request)
elif nova_enabled:
self.secgroups = nova.SecurityGroupManager(request)
else:
self.secgroups = None
@property
def enabled(self):
@ -150,6 +149,14 @@ def servers_update_addresses(request, servers, all_tenants=False):
and Nova's networking info caching mechanism is not fast enough.
"""
# NOTE(amotoki): This check is still needed because 'instances' panel
# calls this method. We dropped security group and floating IP support
# through Nova API (due to novaclient 8.0.0 drops their supports),
# but we can still support 'Instances' panel with nova-network.
# TODO(amotoki): Nova networkinfo info caching mechanism is now fast enough
# as they are updated by Neutron via Nova event callback mechasm,
# so servers_update_addresses is no longer needed.
# We can reduce API calls by dropping it.
neutron_enabled = base.is_service_enabled(request, 'network')
if neutron_enabled:
neutron.servers_update_addresses(request, servers, all_tenants)

View File

@ -24,19 +24,15 @@ import collections
import logging
from django.conf import settings
from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _
import six
from novaclient import api_versions
from novaclient import client as nova_client
from novaclient import exceptions as nova_exceptions
from novaclient.v2 import instance_action as nova_instance_action
from novaclient.v2 import list_extensions as nova_list_extensions
from novaclient.v2 import security_group_rules as nova_rules
from novaclient.v2 import servers as nova_servers
from horizon import conf
from horizon import exceptions as horizon_exceptions
from horizon.utils import functions as utils
from horizon.utils.memoized import memoized
@ -44,7 +40,6 @@ from horizon.utils.memoized import memoized_with_request
from openstack_dashboard.api import base
from openstack_dashboard.api import microversions
from openstack_dashboard.api import network_base
from openstack_dashboard.contrib.developer.profiler import api as profiler
LOG = logging.getLogger(__name__)
@ -221,154 +216,6 @@ class NovaUsage(base.APIResourceWrapper):
return getattr(self, "total_memory_mb_usage", 0)
class SecurityGroup(base.APIResourceWrapper):
"""Wrapper around novaclient.security_groups.SecurityGroup.
Wraps its rules in SecurityGroupRule objects and allows access to them.
"""
_attrs = ['id', 'name', 'description', 'tenant_id']
@cached_property
def rules(self):
"""Wraps transmitted rule info in the novaclient rule class."""
manager = nova_rules.SecurityGroupRuleManager(None)
rule_objs = [nova_rules.SecurityGroupRule(manager, rule)
for rule in self._apiresource.rules]
return [SecurityGroupRule(rule) for rule in rule_objs]
def to_dict(self):
return self._apiresource.to_dict()
@six.python_2_unicode_compatible
class SecurityGroupRule(base.APIResourceWrapper):
"""Wrapper for individual rules in a SecurityGroup."""
_attrs = ['id', 'ip_protocol', 'from_port', 'to_port', 'ip_range', 'group']
def __str__(self):
vals = {
'range': '%s:%s' % (self.from_port, self.to_port),
'ip_protocol': self.ip_protocol
}
if self.from_port == -1 and self.to_port == -1:
vals['range'] = 'any port'
if 'name' in self.group:
vals['group'] = self.group['name']
return (_('ALLOW %(range)s/%(ip_protocol)s from %(group)s') % vals)
else:
vals['cidr'] = self.ip_range['cidr']
return (_('ALLOW %(range)s/%(ip_protocol)s from %(cidr)s') % vals)
# The following attributes are defined to keep compatibility with Neutron
@property
def ethertype(self):
return None
@property
def direction(self):
return 'ingress'
class SecurityGroupManager(network_base.SecurityGroupManager):
backend = 'nova'
def __init__(self, request):
self.request = request
self.client = novaclient(request)
def list(self):
return [SecurityGroup(g) for g
in self.client.security_groups.list()]
def get(self, sg_id):
return SecurityGroup(self.client.security_groups.get(sg_id))
def create(self, name, desc):
return SecurityGroup(self.client.security_groups.create(name, desc))
def update(self, sg_id, name, desc):
return SecurityGroup(self.client.security_groups.update(sg_id,
name, desc))
def delete(self, security_group_id):
self.client.security_groups.delete(security_group_id)
def rule_create(self, parent_group_id,
direction=None, ethertype=None,
ip_protocol=None, from_port=None, to_port=None,
cidr=None, group_id=None):
# Nova Security Group API does not use direction and ethertype fields.
try:
sg = self.client.security_group_rules.create(parent_group_id,
ip_protocol,
from_port,
to_port,
cidr,
group_id)
except nova_exceptions.BadRequest:
raise horizon_exceptions.Conflict(
_('Security group rule already exists.'))
return SecurityGroupRule(sg)
def rule_delete(self, security_group_rule_id):
self.client.security_group_rules.delete(security_group_rule_id)
def list_by_instance(self, instance_id):
"""Gets security groups of an instance."""
return [SecurityGroup(sg) for sg
in self.client.servers.list_security_group(instance_id)]
def update_instance_security_group(self, instance_id,
new_security_group_ids):
try:
all_groups = self.list()
except Exception:
raise Exception(_("Couldn't get security group list."))
wanted_groups = set([sg.name for sg in all_groups
if sg.id in new_security_group_ids])
try:
current_groups = self.list_by_instance(instance_id)
except Exception:
raise Exception(_("Couldn't get current security group "
"list for instance %s.")
% instance_id)
current_group_names = set([sg.name for sg in current_groups])
groups_to_add = wanted_groups - current_group_names
groups_to_remove = current_group_names - wanted_groups
num_groups_to_modify = len(groups_to_add | groups_to_remove)
try:
for group in groups_to_add:
self.client.servers.add_security_group(instance_id, group)
num_groups_to_modify -= 1
for group in groups_to_remove:
self.client.servers.remove_security_group(instance_id, group)
num_groups_to_modify -= 1
except nova_exceptions.ClientException as err:
LOG.error("Failed to modify %(num_groups_to_modify)d instance "
"security groups: %(err)s",
{'num_groups_to_modify': num_groups_to_modify,
'err': err})
# reraise novaclient.exceptions.ClientException, but with
# a sanitized error message so we don't risk exposing
# sensitive information to the end user. This has to be
# novaclient.exceptions.ClientException, not just
# Exception, since the former is recognized as a
# "recoverable" exception by horizon, and therefore the
# error message is passed along to the end user, while
# Exception is swallowed alive by horizon and a generic
# error message is given to the end user
raise nova_exceptions.ClientException(
err.code,
_("Failed to modify %d instance security groups") %
num_groups_to_modify)
return True
class FlavorExtraSpec(object):
def __init__(self, flavor_id, key, val):
self.flavor_id = flavor_id
@ -377,89 +224,6 @@ class FlavorExtraSpec(object):
self.value = val
class FloatingIp(base.APIResourceWrapper):
_attrs = ['id', 'ip', 'fixed_ip', 'port_id', 'instance_id',
'instance_type', 'pool']
def __init__(self, fip):
fip.__setattr__('port_id', fip.instance_id)
fip.__setattr__('instance_type',
'compute' if fip.instance_id else None)
super(FloatingIp, self).__init__(fip)
class FloatingIpPool(base.APIDictWrapper):
def __init__(self, pool):
pool_dict = {'id': pool.name,
'name': pool.name}
super(FloatingIpPool, self).__init__(pool_dict)
class FloatingIpTarget(base.APIDictWrapper):
def __init__(self, server):
server_dict = {'name': '%s (%s)' % (server.name, server.id),
'id': server.id}
super(FloatingIpTarget, self).__init__(server_dict)
class FloatingIpManager(network_base.FloatingIpManager):
def __init__(self, request):
self.request = request
self.client = novaclient(request)
def list_pools(self):
return [FloatingIpPool(pool)
for pool in self.client.floating_ip_pools.list()]
@profiler.trace
def list(self, all_tenants=False):
return [FloatingIp(fip) for fip in
self.client.floating_ips.list(
all_tenants=all_tenants)]
@profiler.trace
def get(self, floating_ip_id):
return FloatingIp(self.client.floating_ips.get(floating_ip_id))
@profiler.trace
def allocate(self, pool, tenant_id=None, **params):
# NOTE: tenant_id will never be used here.
return FloatingIp(self.client.floating_ips.create(pool=pool))
@profiler.trace
def release(self, floating_ip_id):
self.client.floating_ips.delete(floating_ip_id)
@profiler.trace
def associate(self, floating_ip_id, port_id):
# In Nova implied port_id is instance_id
server = self.client.servers.get(port_id)
fip = self.client.floating_ips.get(floating_ip_id)
self.client.servers.add_floating_ip(server.id, fip.ip)
@profiler.trace
def disassociate(self, floating_ip_id):
fip = self.client.floating_ips.get(floating_ip_id)
server = self.client.servers.get(fip.instance_id)
self.client.servers.remove_floating_ip(server.id, fip.ip)
@profiler.trace
def list_targets(self):
return [FloatingIpTarget(s) for s in self.client.servers.list()]
def get_target_id_by_instance(self, instance_id, target_list=None):
return instance_id
def list_target_id_by_instance(self, instance_id, target_list=None):
return [instance_id, ]
def is_simple_associate_supported(self):
return conf.HORIZON_CONFIG["simple_ip_management"]
def is_supported(self):
return True
def get_auth_params_from_request(request):
"""Extracts the properties from the request object needed by the novaclient
call below. These will be used to memoize the calls to novaclient

View File

@ -31,7 +31,7 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
api.neutron: ('network_list', )})
def test_index(self):
# Use neutron test data
fips = self.q_floating_ips.list()
fips = self.floating_ips.list()
servers = self.servers.list()
tenants = self.tenants.list()
api.network.tenant_floating_ip_list(IsA(http.HttpRequest),
@ -61,7 +61,7 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
@test.create_stubs({api.network: ('tenant_floating_ip_get', ),
api.neutron: ('network_get', )})
def test_floating_ip_detail_get(self):
fip = self.q_floating_ips.first()
fip = self.floating_ips.first()
network = self.networks.first()
api.network.tenant_floating_ip_get(
IsA(http.HttpRequest), fip.id).AndReturn(fip)
@ -77,7 +77,7 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
@test.create_stubs({api.network: ('tenant_floating_ip_get',)})
def test_floating_ip_detail_exception(self):
fip = self.q_floating_ips.first()
fip = self.floating_ips.first()
# Only supported by neutron, so raise a neutron exception
api.network.tenant_floating_ip_get(
IsA(http.HttpRequest),
@ -187,8 +187,8 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
api.neutron: ('network_list', )})
def test_admin_disassociate_floatingip(self):
# Use neutron test data
fips = self.q_floating_ips.list()
floating_ip = self.q_floating_ips.list()[1]
fips = self.floating_ips.list()
floating_ip = self.floating_ips.list()[1]
servers = self.servers.list()
tenants = self.tenants.list()
api.network.tenant_floating_ip_list(IsA(http.HttpRequest),
@ -217,8 +217,8 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
api.neutron: ('network_list', )})
def test_admin_delete_floatingip(self):
# Use neutron test data
fips = self.q_floating_ips.list()
floating_ip = self.q_floating_ips.list()[1]
fips = self.floating_ips.list()
floating_ip = self.floating_ips.list()[1]
servers = self.servers.list()
tenants = self.tenants.list()
api.network.tenant_floating_ip_list(IsA(http.HttpRequest),
@ -246,7 +246,7 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
api.neutron: ('network_list', )})
def test_floating_ip_table_actions(self):
# Use neutron test data
fips = self.q_floating_ips.list()
fips = self.floating_ips.list()
servers = self.servers.list()
tenants = self.tenants.list()
api.network.tenant_floating_ip_list(IsA(http.HttpRequest),

View File

@ -108,7 +108,7 @@ class UsageViewTests(test.BaseAdminViewTests):
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_secgroups.list())
.AndReturn(self.security_groups.list())
api.cinder.tenant_absolute_limits(IsA(http.HttpRequest)) \
.AndReturn(self.cinder_limits['absolute'])
@ -204,7 +204,7 @@ class UsageViewTests(test.BaseAdminViewTests):
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_secgroups.list())
.AndReturn(self.security_groups.list())
api.cinder.tenant_absolute_limits(IsA(http.HttpRequest)) \
.AndReturn(self.cinder_limits['absolute'])
self.mox.ReplayAll()

View File

@ -1579,7 +1579,7 @@ class UsageViewTests(test.BaseAdminViewTests):
.AndReturn(self.floating_ips.list())
if neutron_sg_enabled:
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_secgroups.list())
.AndReturn(self.security_groups.list())
def test_usage_csv(self):
self._test_usage_csv(nova_stu_enabled=True)

View File

@ -43,7 +43,7 @@ class FloatingIpViewTests(test.TestCase):
api.network.floating_ip_target_list(IsA(http.HttpRequest)) \
.AndReturn(self._get_fip_targets())
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_floating_ips.list())
.AndReturn(self.floating_ips.list())
self.mox.ReplayAll()
url = reverse('%s:associate' % NAMESPACE)
@ -52,7 +52,7 @@ class FloatingIpViewTests(test.TestCase):
workflow = res.context['workflow']
choices = dict(workflow.steps[0].action.fields['ip_id'].choices)
# Verify that our "associated" floating IP isn't in the choices list.
self.assertNotIn(self.q_floating_ips.first(), choices)
self.assertNotIn(self.floating_ips.first(), choices)
@test.create_stubs({api.network: ('floating_ip_target_list',
'floating_ip_target_get_by_instance',
@ -66,7 +66,7 @@ class FloatingIpViewTests(test.TestCase):
IsA(http.HttpRequest), target.instance_id, targets) \
.AndReturn(target.id)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_floating_ips.list())
.AndReturn(self.floating_ips.list())
self.mox.ReplayAll()
base_url = reverse('%s:associate' % NAMESPACE)
@ -77,7 +77,7 @@ class FloatingIpViewTests(test.TestCase):
workflow = res.context['workflow']
choices = dict(workflow.steps[0].action.fields['ip_id'].choices)
# Verify that our "associated" floating IP isn't in the choices list.
self.assertNotIn(self.q_floating_ips.first(), choices)
self.assertNotIn(self.floating_ips.first(), choices)
def _get_compute_ports(self):
return [p for p in self.ports.list()
@ -104,13 +104,13 @@ class FloatingIpViewTests(test.TestCase):
'tenant_floating_ip_list',)})
def test_associate_with_port_id(self):
compute_port = self._get_compute_ports()[0]
associated_fips = [fip.id for fip in self.q_floating_ips.list()
associated_fips = [fip.id for fip in self.floating_ips.list()
if fip.port_id]
api.network.floating_ip_target_list(IsA(http.HttpRequest)) \
.AndReturn(self._get_fip_targets())
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_floating_ips.list())
.AndReturn(self.floating_ips.list())
self.mox.ReplayAll()
base_url = reverse('%s:associate' % NAMESPACE)
@ -127,13 +127,13 @@ class FloatingIpViewTests(test.TestCase):
'floating_ip_target_list',
'tenant_floating_ip_list',)})
def test_associate_post(self):
floating_ip = [fip for fip in self.q_floating_ips.list()
floating_ip = [fip for fip in self.floating_ips.list()
if not fip.port_id][0]
compute_port = self._get_compute_ports()[0]
port_target_id = self._get_target_id(compute_port)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_floating_ips.list())
.AndReturn(self.floating_ips.list())
api.network.floating_ip_target_list(IsA(http.HttpRequest)) \
.AndReturn(self._get_fip_targets())
api.network.floating_ip_associate(IsA(http.HttpRequest),
@ -151,13 +151,13 @@ class FloatingIpViewTests(test.TestCase):
'floating_ip_target_list',
'tenant_floating_ip_list',)})
def test_associate_post_with_redirect(self):
floating_ip = [fip for fip in self.q_floating_ips.list()
floating_ip = [fip for fip in self.floating_ips.list()
if not fip.port_id][0]
compute_port = self._get_compute_ports()[0]
port_target_id = self._get_target_id(compute_port)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_floating_ips.list())
.AndReturn(self.floating_ips.list())
api.network.floating_ip_target_list(IsA(http.HttpRequest)) \
.AndReturn(self._get_fip_targets())
api.network.floating_ip_associate(IsA(http.HttpRequest),
@ -176,13 +176,13 @@ class FloatingIpViewTests(test.TestCase):
'floating_ip_target_list',
'tenant_floating_ip_list',)})
def test_associate_post_with_exception(self):
floating_ip = [fip for fip in self.q_floating_ips.list()
floating_ip = [fip for fip in self.floating_ips.list()
if not fip.port_id][0]
compute_port = self._get_compute_ports()[0]
port_target_id = self._get_target_id(compute_port)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_floating_ips.list())
.AndReturn(self.floating_ips.list())
api.network.floating_ip_target_list(IsA(http.HttpRequest)) \
.AndReturn(self._get_fip_targets())
api.network.floating_ip_associate(IsA(http.HttpRequest),
@ -203,12 +203,12 @@ class FloatingIpViewTests(test.TestCase):
'tenant_floating_ip_list',),
api.neutron: ('is_extension_supported',)})
def test_disassociate_post(self):
floating_ip = self.q_floating_ips.first()
floating_ip = self.floating_ips.first()
api.nova.server_list(IsA(http.HttpRequest), detailed=False) \
.AndReturn([self.servers.list(), False])
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_floating_ips.list())
.AndReturn(self.floating_ips.list())
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'subnet_allocation')\
.AndReturn(True)
@ -227,12 +227,12 @@ class FloatingIpViewTests(test.TestCase):
'tenant_floating_ip_list',),
api.neutron: ('is_extension_supported',)})
def test_disassociate_post_with_exception(self):
floating_ip = self.q_floating_ips.first()
floating_ip = self.floating_ips.first()
api.nova.server_list(IsA(http.HttpRequest), detailed=False) \
.AndReturn([self.servers.list(), False])
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_floating_ips.list())
.AndReturn(self.floating_ips.list())
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'subnet_allocation')\
.AndReturn(True)
@ -252,7 +252,7 @@ class FloatingIpViewTests(test.TestCase):
quotas: ('tenant_quota_usages',),
api.base: ('is_service_enabled',)})
def test_allocate_button_attributes(self):
floating_ips = self.q_floating_ips.list()
floating_ips = self.floating_ips.list()
floating_pools = self.pools.list()
quota_data = self.quota_usages.first()
quota_data['floating_ips']['available'] = 10
@ -290,7 +290,7 @@ class FloatingIpViewTests(test.TestCase):
quotas: ('tenant_quota_usages',),
api.base: ('is_service_enabled',)})
def test_allocate_button_disabled_when_quota_exceeded(self):
floating_ips = self.q_floating_ips.list()
floating_ips = self.floating_ips.list()
floating_pools = self.pools.list()
quota_data = self.quota_usages.first()
quota_data['floating_ips']['available'] = 0
@ -371,11 +371,11 @@ class FloatingIpViewTests(test.TestCase):
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(self.q_floating_ips.list())
.MultipleTimes().AndReturn(self.floating_ips.list())
api.network.floating_ip_pools_list(IsA(http.HttpRequest)) \
.AndReturn(self.pools.list())
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_secgroups.list())
.AndReturn(self.security_groups.list())
self.mox.ReplayAll()
url = reverse('%s:allocate' % NAMESPACE)

View File

@ -3547,7 +3547,7 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
def test_disassociate_floating_ip(self):
servers = self.servers.list()
server = servers[0]
fip = self.q_floating_ips.first()
fip = self.floating_ips.first()
fip.port_id = server.id
search_opts = {'marker': None, 'paginate': True}

View File

@ -81,7 +81,7 @@ class UsageViewTests(test.TestCase):
.AndReturn(self.floating_ips.list())
if neutron_sg_enabled:
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_secgroups.list())
.AndReturn(self.security_groups.list())
def _nova_stu_enabled(self, exception=False, overview_days_range=1):
now = timezone.now()
@ -271,7 +271,7 @@ class UsageViewTests(test.TestCase):
.AndReturn(self.floating_ips.list())
if neutron_sg_enabled:
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.q_secgroups.list())
.AndReturn(self.security_groups.list())
api.neutron.tenant_quota_get(IsA(http.HttpRequest), self.tenant.id) \
.AndReturn(self.neutron_quotas.first())
self.mox.ReplayAll()

View File

@ -60,7 +60,7 @@ class SecurityGroupsViewTests(test.TestCase):
def setUp(self):
super(SecurityGroupsViewTests, self).setUp()
sec_group = self.q_secgroups.first()
sec_group = self.security_groups.first()
self.detail_url = reverse(SG_DETAIL_VIEW, args=[sec_group.id])
self.edit_url = reverse(SG_ADD_RULE_VIEW, args=[sec_group.id])
self.update_url = reverse(SG_UPDATE_VIEW, args=[sec_group.id])
@ -68,7 +68,7 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_list',),
quotas: ('tenant_quota_usages',)})
def test_index(self):
sec_groups = self.q_secgroups.list()
sec_groups = self.security_groups.list()
quota_data = self.quota_usages.first()
quota_data['security_groups']['available'] = 10
@ -99,7 +99,7 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_list',),
quotas: ('tenant_quota_usages',)})
def test_create_button_attributes(self):
sec_groups = self.q_secgroups.list()
sec_groups = self.security_groups.list()
quota_data = self.quota_usages.first()
quota_data['security_groups']['available'] = 10
@ -115,7 +115,7 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.get(INDEX_URL)
security_groups = res.context['security_groups_table'].data
self.assertItemsEqual(security_groups, self.q_secgroups.list())
self.assertItemsEqual(security_groups, self.security_groups.list())
create_action = self.getAndAssertTableAction(res, 'security_groups',
'create')
@ -132,7 +132,7 @@ class SecurityGroupsViewTests(test.TestCase):
quotas: ('tenant_quota_usages',)})
def _test_create_button_disabled_when_quota_exceeded(self,
network_enabled):
sec_groups = self.q_secgroups.list()
sec_groups = self.security_groups.list()
quota_data = self.quota_usages.first()
quota_data['security_groups']['available'] = 0
@ -148,7 +148,7 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.get(INDEX_URL)
security_groups = res.context['security_groups_table'].data
self.assertItemsEqual(security_groups, self.q_secgroups.list())
self.assertItemsEqual(security_groups, self.security_groups.list())
create_action = self.getAndAssertTableAction(res, 'security_groups',
'create')
@ -164,9 +164,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list')})
def _add_security_group_rule_fixture(self, **kwargs):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.first()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(
IsA(http.HttpRequest),
@ -184,7 +184,7 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_get',)})
def test_update_security_groups_get(self):
sec_group = self.q_secgroups.first()
sec_group = self.security_groups.first()
api.network.security_group_get(IsA(http.HttpRequest),
sec_group.id).AndReturn(sec_group)
self.mox.ReplayAll()
@ -202,7 +202,7 @@ class SecurityGroupsViewTests(test.TestCase):
bug #1233501 Security group names cannot contain at characters
bug #1224576 Security group names cannot contain spaces
"""
sec_group = self.q_secgroups.first()
sec_group = self.security_groups.first()
sec_group.name = "@new name"
api.network.security_group_update(
IsA(http.HttpRequest),
@ -224,7 +224,7 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertTemplateUsed(res, SG_CREATE_TEMPLATE)
def test_create_security_groups_post(self):
sec_group = self.q_secgroups.first()
sec_group = self.security_groups.first()
self._create_security_group(sec_group)
def test_create_security_groups_special_chars(self):
@ -234,7 +234,7 @@ class SecurityGroupsViewTests(test.TestCase):
bug #1233501 Security group names cannot contain at characters
bug #1224576 Security group names cannot contain spaces
"""
sec_group = self.q_secgroups.first()
sec_group = self.security_groups.first()
sec_group.name = '@group name-\xe3\x82\xb3'
self._create_security_group(sec_group)
@ -254,7 +254,7 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_create',)})
def test_create_security_groups_post_exception(self):
sec_group = self.q_secgroups.first()
sec_group = self.security_groups.first()
api.network.security_group_create(
IsA(http.HttpRequest),
sec_group.name,
@ -270,7 +270,7 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_get',)})
def test_detail_get(self):
sec_group = self.q_secgroups.first()
sec_group = self.security_groups.first()
api.network.security_group_get(IsA(http.HttpRequest),
sec_group.id).AndReturn(sec_group)
@ -280,7 +280,7 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_get',)})
def test_detail_get_exception(self):
sec_group = self.q_secgroups.first()
sec_group = self.security_groups.first()
api.network.security_group_get(
IsA(http.HttpRequest),
@ -374,9 +374,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_cidr_with_template(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.first()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id,
@ -400,7 +400,7 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertRedirectsNoFollow(res, self.detail_url)
def _get_source_group_rule(self):
for rule in self.q_secgroup_rules.list():
for rule in self.security_group_rules.list():
if rule.group:
return rule
raise Exception("No matches found.")
@ -408,8 +408,8 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',)})
def test_detail_add_rule_self_as_source_group(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self._get_source_group_rule()
api.network.security_group_rule_create(
@ -441,8 +441,8 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',)})
def test_detail_add_rule_self_as_source_group_with_template(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self._get_source_group_rule()
api.network.security_group_rule_create(
@ -472,9 +472,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_list',)})
def test_detail_invalid_port(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.first()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
@ -497,9 +497,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_list',)})
def test_detail_invalid_port_range(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.first()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
for i in range(3):
api.network.security_group_list(
@ -552,9 +552,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_get',
'security_group_list')})
def test_detail_invalid_icmp_rule(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
icmp_rule = self.q_secgroup_rules.list()[1]
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
icmp_rule = self.security_group_rules.list()[1]
# Call POST 5 times (*2 if Django >= 1.9)
call_post = 5
@ -631,9 +631,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_exception(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.first()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(
IsA(http.HttpRequest),
@ -660,9 +660,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_duplicated(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.first()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(
IsA(http.HttpRequest),
@ -689,8 +689,8 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_delete',)})
def test_detail_delete_rule(self):
sec_group = self.q_secgroups.first()
rule = self.q_secgroup_rules.first()
sec_group = self.security_groups.first()
rule = self.security_group_rules.first()
api.network.security_group_rule_delete(IsA(http.HttpRequest), rule.id)
self.mox.ReplayAll()
@ -705,8 +705,8 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_delete',)})
def test_detail_delete_rule_exception(self):
sec_group = self.q_secgroups.first()
rule = self.q_secgroup_rules.first()
sec_group = self.security_groups.first()
rule = self.security_group_rules.first()
api.network.security_group_rule_delete(
IsA(http.HttpRequest),
@ -717,28 +717,28 @@ class SecurityGroupsViewTests(test.TestCase):
req = self.factory.post(self.edit_url, form_data)
kwargs = {'security_group_id': sec_group.id}
table = tables.RulesTable(
req, self.q_secgroup_rules.list(), **kwargs)
req, self.security_group_rules.list(), **kwargs)
handled = table.maybe_handle()
self.assertEqual(strip_absolute_base(handled['location']),
self.detail_url)
@test.create_stubs({api.network: ('security_group_delete',)})
def test_delete_group(self):
sec_group = self.q_secgroups.get(name="other_group")
sec_group = self.security_groups.get(name="other_group")
api.network.security_group_delete(IsA(http.HttpRequest), sec_group.id)
self.mox.ReplayAll()
form_data = {"action": "security_groups__delete__%s" % sec_group.id}
req = self.factory.post(INDEX_URL, form_data)
table = tables.SecurityGroupsTable(req, self.q_secgroups.list())
table = tables.SecurityGroupsTable(req, self.security_groups.list())
handled = table.maybe_handle()
self.assertEqual(strip_absolute_base(handled['location']),
INDEX_URL)
@test.create_stubs({api.network: ('security_group_delete',)})
def test_delete_group_exception(self):
sec_group = self.q_secgroups.get(name="other_group")
sec_group = self.security_groups.get(name="other_group")
api.network.security_group_delete(
IsA(http.HttpRequest),
@ -748,7 +748,7 @@ class SecurityGroupsViewTests(test.TestCase):
form_data = {"action": "security_groups__delete__%s" % sec_group.id}
req = self.factory.post(INDEX_URL, form_data)
table = tables.SecurityGroupsTable(req, self.q_secgroups.list())
table = tables.SecurityGroupsTable(req, self.security_groups.list())
handled = table.maybe_handle()
self.assertEqual(strip_absolute_base(handled['location']),
@ -757,9 +757,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_custom_protocol(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.first()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'ingress', 'IPv6',
@ -783,9 +783,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_egress(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.first()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'egress', 'IPv4',
@ -809,9 +809,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_egress_with_all_tcp(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.list()[3]
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.list()[3]
api.network.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'egress', 'IPv4',
@ -837,8 +837,8 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_source_group_with_direction_ethertype(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self._get_source_group_rule()
api.network.security_group_rule_create(
@ -896,7 +896,7 @@ class SecurityGroupsViewTests(test.TestCase):
OPENSTACK_NEUTRON_NETWORK={'enable_ipv6': False})
@test.create_stubs({api.network: ('security_group_list',)})
def test_add_rule_cidr_with_ipv6_disabled(self):
sec_group = self.q_secgroups.first()
sec_group = self.security_groups.first()
self.mox.ReplayAll()
@ -915,9 +915,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_list',)})
def test_detail_add_rule_invalid_port(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.first()
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
@ -941,9 +941,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_ingress_tcp_without_port(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.list()[3]
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.list()[3]
api.network.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'ingress', 'IPv4',
@ -968,9 +968,9 @@ class SecurityGroupsViewTests(test.TestCase):
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_custom_without_protocol(self):
sec_group = self.q_secgroups.first()
sec_group_list = self.q_secgroups.list()
rule = self.q_secgroup_rules.list()[3]
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.list()[3]
api.network.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'ingress', 'IPv4',

View File

@ -22,8 +22,6 @@ import six
from oslo_utils import uuidutils
from novaclient.v2 import floating_ip_pools
from openstack_dashboard import api
from openstack_dashboard.test import helpers as test
@ -33,21 +31,17 @@ class NetworkClientTestCase(test.APITestCase):
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
.AndReturn(False)
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
.AndReturn(True)
self.mox.ReplayAll()
nc = api.network.NetworkClient(self.request)
self.assertIsInstance(nc.floating_ips, api.nova.FloatingIpManager)
self.assertIsInstance(nc.secgroups, api.nova.SecurityGroupManager)
self.assertIsNone(nc.floating_ips)
self.assertIsNone(nc.secgroups)
@test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_networkclient_neutron(self):
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
.AndReturn(True)
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
.AndReturn(True)
self.neutronclient = self.stub_neutronclient()
api.neutron.is_extension_supported(IsA(http.HttpRequest),
@ -59,172 +53,17 @@ class NetworkClientTestCase(test.APITestCase):
self.assertIsInstance(nc.secgroups, api.neutron.SecurityGroupManager)
@test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_networkclient_neutron_with_nova_security_group(self):
def test_networkclient_neutron_without_security_group(self):
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
.AndReturn(True)
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'security-group').AndReturn(False)
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
.AndReturn(True)
self.mox.ReplayAll()
nc = api.network.NetworkClient(self.request)
self.assertIsInstance(nc.floating_ips, api.neutron.FloatingIpManager)
self.assertIsInstance(nc.secgroups, api.nova.SecurityGroupManager)
class NetworkApiNovaTestBase(test.APITestCase):
def setUp(self):
super(NetworkApiNovaTestBase, self).setUp()
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
.AndReturn(False)
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
.AndReturn(True)
class NetworkApiNovaSecurityGroupTests(NetworkApiNovaTestBase):
def test_server_update_security_groups(self):
all_secgroups = self.security_groups.list()
added_secgroup = all_secgroups[2]
rm_secgroup = all_secgroups[0]
cur_secgroups = all_secgroups[0:2]
new_sg_ids = [sg.id for sg in all_secgroups[1:3]]
instance_id = self.servers.first().id
novaclient = self.stub_novaclient()
novaclient.security_groups = self.mox.CreateMockAnything()
novaclient.servers = self.mox.CreateMockAnything()
novaclient.client = self.mox.CreateMockAnything()
novaclient.security_groups.list().AndReturn(all_secgroups)
novaclient.servers.list_security_group(instance_id) \
.AndReturn(cur_secgroups)
novaclient.servers.add_security_group(instance_id, added_secgroup.name)
novaclient.servers.remove_security_group(instance_id, rm_secgroup.name)
self.mox.ReplayAll()
api.network.server_update_security_groups(
self.request, instance_id, new_sg_ids)
class NetworkApiNovaFloatingIpTests(NetworkApiNovaTestBase):
def test_floating_ip_pools_list(self):
pool_names = ['pool1', 'pool2']
pools = [floating_ip_pools.FloatingIPPool(
None, {'name': pool}) for pool in pool_names]
novaclient = self.stub_novaclient()
novaclient.floating_ip_pools = self.mox.CreateMockAnything()
novaclient.floating_ip_pools.list().AndReturn(pools)
self.mox.ReplayAll()
ret = api.network.floating_ip_pools_list(self.request)
self.assertEqual(pool_names, [p.name for p in ret])
def test_floating_ip_list(self):
fips = self.api_floating_ips.list()
novaclient = self.stub_novaclient()
novaclient.floating_ips = self.mox.CreateMockAnything()
novaclient.floating_ips.list(all_tenants=False).AndReturn(fips)
self.mox.ReplayAll()
ret = api.network.tenant_floating_ip_list(self.request)
for r, e in zip(ret, fips):
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'instance_id']:
self.assertEqual(getattr(e, attr), getattr(r, attr))
self.assertEqual(e.instance_id, r.port_id)
exp_instance_type = 'compute' if e.instance_id else None
self.assertEqual(exp_instance_type, r.instance_type)
def test_floating_ip_get(self):
fip = self.api_floating_ips.first()
novaclient = self.stub_novaclient()
novaclient.floating_ips = self.mox.CreateMockAnything()
novaclient.floating_ips.get(fip.id).AndReturn(fip)
self.mox.ReplayAll()
ret = api.network.tenant_floating_ip_get(self.request, fip.id)
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'instance_id']:
self.assertEqual(getattr(fip, attr), getattr(ret, attr))
self.assertEqual(fip.instance_id, ret.port_id)
self.assertEqual(fip.instance_id, ret.instance_id)
self.assertEqual('compute', ret.instance_type)
def test_floating_ip_allocate(self):
pool_name = 'fip_pool'
fip = [fip for fip in self.api_floating_ips.list()
if not fip.instance_id][0]
novaclient = self.stub_novaclient()
novaclient.floating_ips = self.mox.CreateMockAnything()
novaclient.floating_ips.create(pool=pool_name).AndReturn(fip)
self.mox.ReplayAll()
ret = api.network.tenant_floating_ip_allocate(self.request, pool_name)
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'instance_id']:
self.assertEqual(getattr(fip, attr), getattr(ret, attr))
self.assertIsNone(ret.port_id)
self.assertIsNone(ret.instance_type)
def test_floating_ip_release(self):
fip = self.api_floating_ips.first()
novaclient = self.stub_novaclient()
novaclient.floating_ips = self.mox.CreateMockAnything()
novaclient.floating_ips.delete(fip.id)
self.mox.ReplayAll()
api.network.tenant_floating_ip_release(self.request, fip.id)
def test_floating_ip_associate(self):
server = api.nova.Server(self.servers.first(), self.request)
floating_ip = self.floating_ips.first()
novaclient = self.stub_novaclient()
novaclient.floating_ips = self.mox.CreateMockAnything()
novaclient.servers = self.mox.CreateMockAnything()
novaclient.servers.get(server.id).AndReturn(server)
novaclient.floating_ips.get(floating_ip.id).AndReturn(floating_ip)
novaclient.servers.add_floating_ip(server.id, floating_ip.ip) \
.AndReturn(server)
self.mox.ReplayAll()
api.network.floating_ip_associate(self.request,
floating_ip.id,
server.id)
def test_floating_ip_disassociate(self):
server = api.nova.Server(self.servers.first(), self.request)
floating_ip = self.api_floating_ips.first()
novaclient = self.stub_novaclient()
novaclient.servers = self.mox.CreateMockAnything()
novaclient.floating_ips = self.mox.CreateMockAnything()
novaclient.servers.get(server.id).AndReturn(server)
novaclient.floating_ips.get(floating_ip.id).AndReturn(floating_ip)
novaclient.servers.remove_floating_ip(server.id, floating_ip.ip) \
.AndReturn(server)
self.mox.ReplayAll()
api.network.floating_ip_disassociate(self.request,
floating_ip.id)
def test_floating_ip_target_list(self):
servers = self.servers.list()
novaclient = self.stub_novaclient()
novaclient.servers = self.mox.CreateMockAnything()
novaclient.servers.list().AndReturn(servers)
self.mox.ReplayAll()
targets = api.network.floating_ip_target_list(self.request)
for target, server in zip(targets, servers):
self.assertEqual(server.id, target.id)
self.assertEqual('%s (%s)' % (server.name, server.id), target.name)
def test_floating_ip_target_get_by_instance(self):
self.mox.ReplayAll()
instance_id = self.servers.first().id
ret = api.network.floating_ip_target_get_by_instance(self.request,
instance_id)
self.assertEqual(instance_id, ret)
self.assertIsNone(nc.secgroups)
class NetworkApiNeutronTestBase(test.APITestCase):
@ -251,7 +90,7 @@ class NetworkApiNeutronTests(NetworkApiNeutronTestBase):
'OS-EXT-IPS:type': 'fixed'})
if no_fip_expected:
continue
fips = self.q_floating_ips.filter(port_id=p['id'])
fips = self.floating_ips.filter(port_id=p['id'])
if not fips:
continue
# Only one FIP should match.
@ -281,7 +120,7 @@ class NetworkApiNeutronTests(NetworkApiNeutronTestBase):
if p['device_id'] in server_ids]
server_port_ids = [p['id'] for p in server_ports]
if router_enabled:
assoc_fips = [fip for fip in self.api_q_floating_ips.list()
assoc_fips = [fip for fip in self.api_floating_ips.list()
if fip['port_id'] in server_port_ids]
server_network_ids = [p['network_id'] for p in server_ports]
server_networks = [net for net in self.api_networks.list()
@ -348,10 +187,8 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
super(NetworkApiNeutronSecurityGroupTests, self).setUp()
self.qclient.list_extensions() \
.AndReturn({'extensions': self.api_extensions.list()})
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
.AndReturn(True)
self.sg_dict = dict([(sg['id'], sg['name']) for sg
in self.api_q_secgroups.list()])
in self.api_security_groups.list()])
def _cmp_sg_rule(self, exprule, retrule):
self.assertEqual(exprule['id'], retrule.id)
@ -385,11 +222,11 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_list(self):
sgs = self.api_q_secgroups.list()
sgs = self.api_security_groups.list()
tenant_id = self.request.user.tenant_id
# api.neutron.is_extension_supported(self.request, 'security-group').\
# AndReturn(True)
# use deepcopy to ensure self.api_q_secgroups is not modified.
# use deepcopy to ensure self.api_security_groups is not modified.
self.qclient.list_security_groups(tenant_id=tenant_id) \
.AndReturn({'security_groups': copy.deepcopy(sgs)})
self.mox.ReplayAll()
@ -401,16 +238,16 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_get(self):
secgroup = self.api_q_secgroups.first()
secgroup = self.api_security_groups.first()
sg_ids = set([secgroup['id']] +
[rule['remote_group_id'] for rule
in secgroup['security_group_rules']
if rule['remote_group_id']])
related_sgs = [sg for sg in self.api_q_secgroups.list()
related_sgs = [sg for sg in self.api_security_groups.list()
if sg['id'] in sg_ids]
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
# use deepcopy to ensure self.api_q_secgroups is not modified.
# use deepcopy to ensure self.api_security_groups is not modified.
self.qclient.show_security_group(secgroup['id']) \
.AndReturn({'security_group': copy.deepcopy(secgroup)})
self.qclient.list_security_groups(id=sg_ids, fields=['id', 'name']) \
@ -421,7 +258,7 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_create(self):
secgroup = self.api_q_secgroups.list()[1]
secgroup = self.api_security_groups.list()[1]
body = {'security_group':
{'name': secgroup['name'],
'description': secgroup['description'],
@ -437,7 +274,7 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_update(self):
secgroup = self.api_q_secgroups.list()[1]
secgroup = self.api_security_groups.list()[1]
secgroup = copy.deepcopy(secgroup)
secgroup['name'] = 'newname'
secgroup['description'] = 'new description'
@ -457,7 +294,7 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_delete(self):
secgroup = self.api_q_secgroups.first()
secgroup = self.api_security_groups.first()
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.delete_security_group(secgroup['id'])
@ -466,10 +303,10 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_rule_create(self):
sg_rule = [r for r in self.api_q_secgroup_rules.list()
sg_rule = [r for r in self.api_security_group_rules.list()
if r['protocol'] == 'tcp' and r['remote_ip_prefix']][0]
sg_id = sg_rule['security_group_id']
secgroup = [sg for sg in self.api_q_secgroups.list()
secgroup = [sg for sg in self.api_security_groups.list()
if sg['id'] == sg_id][0]
post_rule = copy.deepcopy(sg_rule)
@ -494,7 +331,7 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_rule_delete(self):
sg_rule = self.api_q_secgroup_rules.first()
sg_rule = self.api_security_group_rules.first()
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.delete_security_group_rule(sg_rule['id'])
@ -515,11 +352,11 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
return (instance_id, instance_ports)
def test_server_security_groups(self):
cur_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]]
cur_sg_ids = [sg['id'] for sg in self.api_security_groups.list()[:2]]
instance_id, instance_ports = self._get_instance(cur_sg_ids)
self.qclient.list_ports(device_id=instance_id) \
.AndReturn({'ports': instance_ports})
secgroups = copy.deepcopy(self.api_q_secgroups.list())
secgroups = copy.deepcopy(self.api_security_groups.list())
self.qclient.list_security_groups(id=set(cur_sg_ids)) \
.AndReturn({'security_groups': secgroups})
self.mox.ReplayAll()
@ -527,8 +364,8 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
api.network.server_security_groups(self.request, instance_id)
def test_server_update_security_groups(self):
cur_sg_ids = [self.api_q_secgroups.first()['id']]
new_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]]
cur_sg_ids = [self.api_security_groups.first()['id']]
new_sg_ids = [sg['id'] for sg in self.api_security_groups.list()[:2]]
instance_id, instance_ports = self._get_instance(cur_sg_ids)
self.qclient.list_ports(device_id=instance_id) \
.AndReturn({'ports': instance_ports})
@ -546,8 +383,6 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
super(NetworkApiNeutronFloatingIpTests, self).setUp()
self.qclient.list_extensions() \
.AndReturn({'extensions': self.api_extensions.list()})
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
.AndReturn(True)
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_router': True})
def test_floating_ip_supported(self):
@ -576,7 +411,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
[getattr(p, attr) for p in rets])
def test_floating_ip_list(self):
fips = self.api_q_floating_ips.list()
fips = self.api_floating_ips.list()
filters = {'tenant_id': self.request.user.tenant_id}
self.qclient.list_floatingips(**filters) \
@ -600,7 +435,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
self.assertIsNone(ret.instance_type)
def test_floating_ip_list_all_tenants(self):
fips = self.api_q_floating_ips.list()
fips = self.api_floating_ips.list()
self.qclient.list_floatingips().AndReturn({'floatingips': fips})
self.qclient.list_ports().AndReturn({'ports': self.api_ports.list()})
self.mox.ReplayAll()
@ -629,7 +464,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
self.assertIsNone(ret.instance_type)
def _test_floating_ip_get_associated(self, assoc_port, exp_instance_type):
fip = self.api_q_floating_ips.list()[1]
fip = self.api_floating_ips.list()[1]
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
self.qclient.show_port(assoc_port['id']) \
@ -654,7 +489,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
self._test_floating_ip_get_associated(assoc_port, 'loadbalancer')
def test_floating_ip_get_unassociated(self):
fip = self.api_q_floating_ips.list()[0]
fip = self.api_floating_ips.list()[0]
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
self.mox.ReplayAll()
@ -669,7 +504,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
ext_nets = [n for n in self.api_networks.list()
if n['router:external']]
ext_net = ext_nets[0]
fip = self.api_q_floating_ips.first()
fip = self.api_floating_ips.first()
self.qclient.create_floatingip(
{'floatingip': {'floating_network_id': ext_net['id'],
'tenant_id': self.request.user.project_id}}) \
@ -685,7 +520,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_floating_ip_release(self):
fip = self.api_q_floating_ips.first()
fip = self.api_floating_ips.first()
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.delete_floatingip(fip['id'])
@ -694,7 +529,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
api.network.tenant_floating_ip_release(self.request, fip['id'])
def test_floating_ip_associate(self):
fip = self.api_q_floating_ips.list()[1]
fip = self.api_floating_ips.list()[1]
assoc_port = self.api_ports.list()[1]
ip_address = assoc_port['fixed_ips'][0]['ip_address']
target_id = '%s_%s' % (assoc_port['id'], ip_address)
@ -707,7 +542,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
api.network.floating_ip_associate(self.request, fip['id'], target_id)
def test_floating_ip_disassociate(self):
fip = self.api_q_floating_ips.list()[1]
fip = self.api_floating_ips.list()[1]
self.qclient.update_floatingip(fip['id'],
{'floatingip': {'port_id': None}})

View File

@ -34,9 +34,9 @@ def data(TEST):
TEST.routers = utils.TestDataContainer()
TEST.routers_with_rules = utils.TestDataContainer()
TEST.routers_with_routes = utils.TestDataContainer()
TEST.q_floating_ips = utils.TestDataContainer()
TEST.q_secgroups = utils.TestDataContainer()
TEST.q_secgroup_rules = utils.TestDataContainer()
TEST.floating_ips = utils.TestDataContainer()
TEST.security_groups = utils.TestDataContainer()
TEST.security_group_rules = utils.TestDataContainer()
TEST.providers = utils.TestDataContainer()
TEST.pools = utils.TestDataContainer()
TEST.vips = utils.TestDataContainer()
@ -61,9 +61,9 @@ def data(TEST):
TEST.api_ports = utils.TestDataContainer()
TEST.api_routers = utils.TestDataContainer()
TEST.api_routers_with_routes = utils.TestDataContainer()
TEST.api_q_floating_ips = utils.TestDataContainer()
TEST.api_q_secgroups = utils.TestDataContainer()
TEST.api_q_secgroup_rules = utils.TestDataContainer()
TEST.api_floating_ips = utils.TestDataContainer()
TEST.api_security_groups = utils.TestDataContainer()
TEST.api_security_group_rules = utils.TestDataContainer()
TEST.api_pools = utils.TestDataContainer()
TEST.api_vips = utils.TestDataContainer()
TEST.api_members = utils.TestDataContainer()
@ -390,11 +390,11 @@ def data(TEST):
'fixed_ip_address': None,
'port_id': None,
'router_id': None}
TEST.api_q_floating_ips.add(fip_dict)
TEST.api_floating_ips.add(fip_dict)
fip_with_instance = copy.deepcopy(fip_dict)
fip_with_instance.update({'instance_id': None,
'instance_type': None})
TEST.q_floating_ips.add(neutron.FloatingIp(fip_with_instance))
TEST.floating_ips.add(neutron.FloatingIp(fip_with_instance))
# Associated (with compute port on 1st network).
fip_dict = {'tenant_id': '1',
@ -404,11 +404,11 @@ def data(TEST):
'fixed_ip_address': assoc_port['fixed_ips'][0]['ip_address'],
'port_id': assoc_port['id'],
'router_id': router_dict['id']}
TEST.api_q_floating_ips.add(fip_dict)
TEST.api_floating_ips.add(fip_dict)
fip_with_instance = copy.deepcopy(fip_dict)
fip_with_instance.update({'instance_id': '1',
'instance_type': 'compute'})
TEST.q_floating_ips.add(neutron.FloatingIp(fip_with_instance))
TEST.floating_ips.add(neutron.FloatingIp(fip_with_instance))
# Security group.
@ -490,14 +490,14 @@ def data(TEST):
sg_name_dict = dict([(sg['id'], sg['name']) for sg in groups])
for sg in groups:
# Neutron API.
TEST.api_q_secgroups.add(sg)
TEST.api_security_groups.add(sg)
for rule in sg['security_group_rules']:
TEST.api_q_secgroup_rules.add(copy.copy(rule))
TEST.api_security_group_rules.add(copy.copy(rule))
# OpenStack Dashboard internaly API.
TEST.q_secgroups.add(
TEST.security_groups.add(
neutron.SecurityGroup(copy.deepcopy(sg), sg_name_dict))
for rule in sg['security_group_rules']:
TEST.q_secgroup_rules.add(
TEST.security_group_rules.add(
neutron.SecurityGroupRule(copy.copy(rule), sg_name_dict))
# Subnetpools

View File

@ -14,20 +14,15 @@
import json
from oslo_utils import uuidutils
from novaclient.v2 import aggregates
from novaclient.v2 import availability_zones
from novaclient.v2 import certs
from novaclient.v2 import flavor_access
from novaclient.v2 import flavors
from novaclient.v2 import floating_ips
from novaclient.v2 import hosts
from novaclient.v2 import hypervisors
from novaclient.v2 import keypairs
from novaclient.v2 import quotas
from novaclient.v2 import security_group_rules as rules
from novaclient.v2 import security_groups as sec_groups
from novaclient.v2 import server_groups
from novaclient.v2 import servers
from novaclient.v2 import services
@ -35,7 +30,6 @@ from novaclient.v2 import usage
from novaclient.v2 import volumes
from openstack_dashboard.api import base
from openstack_dashboard.api import nova
from openstack_dashboard.usage import quotas as usage_quotas
from openstack_dashboard.test.test_data import utils
@ -166,16 +160,10 @@ def data(TEST):
TEST.flavors = utils.TestDataContainer()
TEST.flavor_access = utils.TestDataContainer()
TEST.keypairs = utils.TestDataContainer()
TEST.security_groups = utils.TestDataContainer()
TEST.security_groups_uuid = utils.TestDataContainer()
TEST.security_group_rules = utils.TestDataContainer()
TEST.security_group_rules_uuid = utils.TestDataContainer()
TEST.volumes = utils.TestDataContainer()
TEST.quotas = utils.TestDataContainer()
TEST.quota_usages = utils.TestDataContainer()
TEST.disabled_quotas = utils.TestDataContainer()
TEST.floating_ips = utils.TestDataContainer()
TEST.floating_ips_uuid = utils.TestDataContainer()
TEST.usages = utils.TestDataContainer()
TEST.certs = utils.TestDataContainer()
TEST.availability_zones = utils.TestDataContainer()
@ -185,11 +173,6 @@ def data(TEST):
TEST.hosts = utils.TestDataContainer()
TEST.server_groups = utils.TestDataContainer()
# Data return by novaclient.
# It is used if API layer does data conversion.
TEST.api_floating_ips = utils.TestDataContainer()
TEST.api_floating_ips_uuid = utils.TestDataContainer()
# Volumes
volume = volumes.Volume(
volumes.VolumeManager(None),
@ -313,83 +296,6 @@ def data(TEST):
dict(name='keyName'))
TEST.keypairs.add(keypair)
# Security Groups and Rules
def generate_security_groups(is_uuid=False):
def get_id(is_uuid):
global current_int_id
if is_uuid:
return uuidutils.generate_uuid()
else:
get_id.current_int_id += 1
return get_id.current_int_id
get_id.current_int_id = 0
sg_manager = sec_groups.SecurityGroupManager(None)
rule_manager = rules.SecurityGroupRuleManager(None)
sec_group_1 = sec_groups.SecurityGroup(sg_manager,
{"rules": [],
"tenant_id": TEST.tenant.id,
"id": get_id(is_uuid),
"name": u"default",
"description": u"default"})
sec_group_2 = sec_groups.SecurityGroup(sg_manager,
{"rules": [],
"tenant_id": TEST.tenant.id,
"id": get_id(is_uuid),
"name": u"other_group",
"description": u"NotDefault."})
sec_group_3 = sec_groups.SecurityGroup(sg_manager,
{"rules": [],
"tenant_id": TEST.tenant.id,
"id": get_id(is_uuid),
"name": u"another_group",
"description": u"NotDefault."})
rule = {'id': get_id(is_uuid),
'group': {},
'ip_protocol': u"tcp",
'from_port': u"80",
'to_port': u"80",
'parent_group_id': sec_group_1.id,
'ip_range': {'cidr': u"0.0.0.0/32"}}
icmp_rule = {'id': get_id(is_uuid),
'group': {},
'ip_protocol': u"icmp",
'from_port': u"9",
'to_port': u"5",
'parent_group_id': sec_group_1.id,
'ip_range': {'cidr': u"0.0.0.0/32"}}
group_rule = {'id': 3,
'group': {},
'ip_protocol': u"tcp",
'from_port': u"80",
'to_port': u"80",
'parent_group_id': sec_group_1.id,
'source_group_id': sec_group_1.id}
rule_obj = rules.SecurityGroupRule(rule_manager, rule)
rule_obj2 = rules.SecurityGroupRule(rule_manager, icmp_rule)
rule_obj3 = rules.SecurityGroupRule(rule_manager, group_rule)
sec_group_1.rules = [rule_obj]
sec_group_2.rules = [rule_obj]
return {"rules": [rule_obj, rule_obj2, rule_obj3],
"groups": [sec_group_1, sec_group_2, sec_group_3]}
sg_data = generate_security_groups()
TEST.security_group_rules.add(*sg_data["rules"])
TEST.security_groups.add(*sg_data["groups"])
sg_uuid_data = generate_security_groups(is_uuid=True)
TEST.security_group_rules_uuid.add(*sg_uuid_data["rules"])
TEST.security_groups_uuid.add(*sg_uuid_data["groups"])
# Quota Sets
quota_data = dict(metadata_items='1',
injected_file_content_bytes='1',
@ -498,51 +404,6 @@ def data(TEST):
u'type': u'rdp'}}
TEST.servers.rdp_console_data = console
# Floating IPs
def generate_fip(conf):
return floating_ips.FloatingIP(floating_ips.FloatingIPManager(None),
conf)
fip_1 = {'id': 1,
'fixed_ip': '10.0.0.4',
'instance_id': server_1.id,
'ip': '58.58.58.58',
'pool': 'pool1'}
fip_2 = {'id': 2,
'fixed_ip': None,
'instance_id': None,
'ip': '58.58.58.58',
'pool': 'pool2'}
# this floating ip is for lbaas tests
fip_3 = {'id': 3,
'fixed_ip': '10.0.0.5',
# the underlying class maps the instance id to port id
'instance_id': '063cf7f3-ded1-4297-bc4c-31eae876cc91',
'ip': '58.58.58.58',
'pool': 'pool2'}
TEST.api_floating_ips.add(generate_fip(fip_1), generate_fip(fip_2),
generate_fip(fip_3))
TEST.floating_ips.add(nova.FloatingIp(generate_fip(fip_1)),
nova.FloatingIp(generate_fip(fip_2)),
nova.FloatingIp(generate_fip(fip_3)))
# Floating IP with UUID id (for Floating IP with Neutron Proxy)
fip_3 = {'id': uuidutils.generate_uuid(),
'fixed_ip': '10.0.0.4',
'instance_id': server_1.id,
'ip': '58.58.58.58',
'pool': 'pool1'}
fip_4 = {'id': uuidutils.generate_uuid(),
'fixed_ip': None,
'instance_id': None,
'ip': '58.58.58.58',
'pool': 'pool2'}
TEST.api_floating_ips_uuid.add(generate_fip(fip_3), generate_fip(fip_4))
TEST.floating_ips_uuid.add(nova.FloatingIp(generate_fip(fip_3)),
nova.FloatingIp(generate_fip(fip_4)))
# Usage
usage_vals = {"tenant_id": TEST.tenant.id,
"instance_name": server_1.name,