177 lines
6.9 KiB
Python
177 lines
6.9 KiB
Python
# Copyright 2015 SUSE Linux Products GmbH
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from neutron_lib import constants
|
|
from neutron_lib import exceptions as n_exc
|
|
from oslo_config import cfg
|
|
import testtools
|
|
|
|
from neutron import context
|
|
from neutron.db import db_base_plugin_v2 as base_plugin
|
|
from neutron.db import models_v2
|
|
from neutron.ipam.drivers.neutrondb_ipam import db_models as ipam_models
|
|
from neutron.tests.unit import testlib_api
|
|
|
|
|
|
# required in order for testresources to optimize same-backend
|
|
# tests together
|
|
load_tests = testlib_api.module_load_tests
|
|
# FIXME(zzzeek): needs to be provided by oslo.db, current version
|
|
# is not working
|
|
# load_tests = test_base.optimize_db_test_loader(__file__)
|
|
|
|
|
|
class IpamTestCase(testlib_api.SqlTestCase):
|
|
"""
|
|
Base class for tests that aim to test ip allocation.
|
|
"""
|
|
use_pluggable_ipam = False
|
|
|
|
def setUp(self):
|
|
super(IpamTestCase, self).setUp()
|
|
cfg.CONF.set_override('notify_nova_on_port_status_changes', False)
|
|
if self.use_pluggable_ipam:
|
|
self._turn_on_pluggable_ipam()
|
|
else:
|
|
self._turn_off_pluggable_ipam()
|
|
self.plugin = base_plugin.NeutronDbPluginV2()
|
|
self.cxt = context.Context(user_id=None,
|
|
tenant_id=None,
|
|
is_admin=True,
|
|
overwrite=False)
|
|
self.tenant_id = 'test_tenant'
|
|
self.network_id = 'test_net_id'
|
|
self.subnet_id = 'test_sub_id'
|
|
self.port_id = 'test_p_id'
|
|
self._create_network()
|
|
self._create_subnet()
|
|
|
|
def _turn_off_pluggable_ipam(self):
|
|
cfg.CONF.set_override('ipam_driver', None)
|
|
self.ip_availability_range = models_v2.IPAvailabilityRange
|
|
|
|
def _turn_on_pluggable_ipam(self):
|
|
cfg.CONF.set_override('ipam_driver', 'internal')
|
|
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
|
|
self.setup_coreplugin(DB_PLUGIN_KLASS)
|
|
self.ip_availability_range = ipam_models.IpamAvailabilityRange
|
|
|
|
def result_set_to_dicts(self, resultset, keys):
|
|
dicts = []
|
|
for item in resultset:
|
|
item_dict = dict((x, item[x]) for x in keys)
|
|
dicts.append(item_dict)
|
|
return dicts
|
|
|
|
def assert_ip_alloc_matches(self, expected):
|
|
result_set = self.cxt.session.query(models_v2.IPAllocation).all()
|
|
keys = ['port_id', 'ip_address', 'subnet_id', 'network_id']
|
|
actual = self.result_set_to_dicts(result_set, keys)
|
|
self.assertEqual(expected, actual)
|
|
|
|
def assert_ip_avail_range_matches(self, expected):
|
|
result_set = self.cxt.session.query(
|
|
self.ip_availability_range).all()
|
|
keys = ['first_ip', 'last_ip']
|
|
actual = self.result_set_to_dicts(result_set, keys)
|
|
self.assertEqual(expected, actual)
|
|
|
|
def assert_ip_alloc_pool_matches(self, expected):
|
|
result_set = self.cxt.session.query(models_v2.IPAllocationPool).all()
|
|
keys = ['first_ip', 'last_ip', 'subnet_id']
|
|
actual = self.result_set_to_dicts(result_set, keys)
|
|
self.assertEqual(expected, actual)
|
|
|
|
def _create_network(self):
|
|
network = {'tenant_id': self.tenant_id,
|
|
'id': self.network_id,
|
|
'name': 'test-net',
|
|
'admin_state_up': True,
|
|
'shared': False,
|
|
'status': constants.NET_STATUS_ACTIVE}
|
|
return self.plugin.create_network(self.cxt, {'network': network})
|
|
|
|
def _create_subnet(self):
|
|
subnet = {'tenant_id': self.tenant_id,
|
|
'id': self.subnet_id,
|
|
'name': 'test_sub',
|
|
'network_id': self.network_id,
|
|
'ip_version': 4,
|
|
'cidr': '10.10.10.0/29',
|
|
'enable_dhcp': False,
|
|
'gateway_ip': '10.10.10.1',
|
|
'shared': False,
|
|
'allocation_pools': constants.ATTR_NOT_SPECIFIED,
|
|
'dns_nameservers': constants.ATTR_NOT_SPECIFIED,
|
|
'host_routes': constants.ATTR_NOT_SPECIFIED}
|
|
return self.plugin.create_subnet(self.cxt, {'subnet': subnet})
|
|
|
|
def _create_port(self, port_id, fixed_ips=None):
|
|
port_fixed_ips = (fixed_ips if fixed_ips else
|
|
constants.ATTR_NOT_SPECIFIED)
|
|
port = {'tenant_id': self.tenant_id,
|
|
'name': 'test_port',
|
|
'id': port_id,
|
|
'network_id': self.network_id,
|
|
'mac_address': constants.ATTR_NOT_SPECIFIED,
|
|
'admin_state_up': True,
|
|
'status': constants.PORT_STATUS_ACTIVE,
|
|
'device_id': 'test_dev_id',
|
|
'device_owner': 'compute',
|
|
'fixed_ips': port_fixed_ips}
|
|
self.plugin.create_port(self.cxt, {'port': port})
|
|
|
|
def test_allocate_fixed_ip(self):
|
|
fixed_ip = [{'ip_address': "10.10.10.3", 'subnet_id': self.subnet_id}]
|
|
self._create_port(self.port_id, fixed_ip)
|
|
|
|
ip_alloc_expected = [{'port_id': self.port_id,
|
|
'ip_address': fixed_ip[0].get('ip_address'),
|
|
'subnet_id': self.subnet_id,
|
|
'network_id': self.network_id}]
|
|
ip_alloc_pool_expected = [{'first_ip': '10.10.10.2',
|
|
'last_ip': '10.10.10.6',
|
|
'subnet_id': self.subnet_id}]
|
|
self.assert_ip_alloc_matches(ip_alloc_expected)
|
|
self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected)
|
|
|
|
def test_allocate_ip_exausted_pool(self):
|
|
# available from .2 up to .6 -> 5
|
|
for i in range(1, 6):
|
|
self._create_port(self.port_id + str(i))
|
|
|
|
ip_alloc_pool_expected = [{'first_ip': '10.10.10.2',
|
|
'last_ip': '10.10.10.6',
|
|
'subnet_id': self.subnet_id}]
|
|
self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected)
|
|
with testtools.ExpectedException(n_exc.IpAddressGenerationFailure):
|
|
self._create_port(self.port_id)
|
|
|
|
|
|
class TestIpamMySql(testlib_api.MySQLTestCaseMixin, IpamTestCase):
|
|
pass
|
|
|
|
|
|
class TestIpamPsql(testlib_api.PostgreSQLTestCaseMixin, IpamTestCase):
|
|
pass
|
|
|
|
|
|
class TestPluggableIpamMySql(testlib_api.MySQLTestCaseMixin, IpamTestCase):
|
|
use_pluggable_ipam = True
|
|
|
|
|
|
class TestPluggableIpamPsql(testlib_api.PostgreSQLTestCaseMixin, IpamTestCase):
|
|
use_pluggable_ipam = True
|