Merge "Do global CIDR check if overlapping IPs disabled." into milestone-proposed

This commit is contained in:
Jenkins 2012-09-25 19:36:20 +00:00 committed by Gerrit Code Review
commit 22993a0d4f
6 changed files with 58 additions and 22 deletions

View File

@ -44,6 +44,9 @@ api_paste_config = api-paste.ini
# Enable or disable bulk create/update/delete operations
# allow_bulk = True
# Enable or disable overlapping IPs for subnets
# allow_overlapping_ips = False
# RPC configuration options. Defined in rpc __init__
# The messaging module to use, defaults to kombu.
# rpc_backend = quantum.openstack.common.rpc.impl_kombu

View File

@ -50,6 +50,7 @@ core_opts = [
cfg.IntOpt('max_subnet_host_routes', default=20),
cfg.StrOpt('state_path', default='.'),
cfg.IntOpt('dhcp_lease_duration', default=120),
cfg.BoolOpt('allow_overlapping_ips', default=False)
]
# Register the configuration options

View File

@ -181,18 +181,17 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
return []
def _get_route_by_subnet(self, context, subnet_id):
try:
route_qry = context.session.query(models_v2.Route)
return route_qry.filter_by(subnet_id=subnet_id).all()
except exc.NoResultFound:
return []
route_qry = context.session.query(models_v2.Route)
return route_qry.filter_by(subnet_id=subnet_id).all()
def _get_subnets_by_network(self, context, network_id):
try:
subnet_qry = context.session.query(models_v2.Subnet)
return subnet_qry.filter_by(network_id=network_id).all()
except exc.NoResultFound:
return []
subnet_qry = context.session.query(models_v2.Subnet)
return subnet_qry.filter_by(network_id=network_id).all()
def _get_all_subnets(self, context):
# NOTE(salvatore-orlando): This query might end up putting
# a lot of stress on the db. Consider adding a cache layer
return context.session.query(models_v2.Subnet).all()
def _fields(self, resource, fields):
if fields:
@ -634,22 +633,27 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
'subnet_id': result['subnet_id']})
return ips
def _validate_subnet_cidr(self, network, new_subnet_cidr):
def _validate_subnet_cidr(self, context, network, new_subnet_cidr):
"""Validate the CIDR for a subnet.
Verifies the specified CIDR does not overlap with the ones defined
for the other subnets specified for this network.
for the other subnets specified for this network, or with any other
CIDR if overlapping IPs are disabled.
"""
for subnet in network.subnets:
if (netaddr.IPSet([subnet.cidr]) &
netaddr.IPSet([new_subnet_cidr])):
err_msg = ("Requested subnet with cidr: %s "
"for network: %s "
"overlaps with subnet: %s)" % (new_subnet_cidr,
network.id,
subnet.cidr))
LOG.error(err_msg)
new_subnet_ipset = netaddr.IPSet([new_subnet_cidr])
subnet_list = network.subnets
if not cfg.CONF.allow_overlapping_ips:
subnet_list = self._get_all_subnets(context)
for subnet in subnet_list:
if (netaddr.IPSet([subnet.cidr]) & new_subnet_ipset):
# don't give out details of the overlapping subnet
err_msg = _("Requested subnet with cidr: %s "
"for network: %s overlaps with another subnet" %
(new_subnet_cidr, network.id))
LOG.error("Validation for CIDR:%s failed - overlaps with "
"subnet %s (CIDR:%s)",
new_subnet_cidr, subnet.id, subnet.cidr)
raise q_exc.InvalidInput(error_message=err_msg)
def _validate_allocation_pools(self, ip_pools, gateway_ip, subnet_cidr):
@ -987,7 +991,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
tenant_id = self._get_tenant_id_for_create(context, s)
with context.session.begin(subtransactions=True):
network = self._get_network(context, s["network_id"])
self._validate_subnet_cidr(network, s['cidr'])
self._validate_subnet_cidr(context, network, s['cidr'])
# The 'shared' attribute for subnets is for internal plugin
# use only. It is not exposed through the API
subnet = models_v2.Subnet(tenant_id=tenant_id,

View File

@ -227,6 +227,8 @@ class MetaQuantumPluginV2Test(unittest.TestCase):
self.plugin.delete_network(self.context, network_ret3['id'])
def test_create_delete_subnet(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
network1 = self._fake_network('fake1')
network_ret1 = self.plugin.create_network(self.context, network1)
network2 = self._fake_network('fake2')

View File

@ -712,6 +712,8 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self._validate_behavior_on_bulk_failure(res, 'ports')
def test_list_ports(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(), self.port()) as (port1, port2):
req = self.new_list_request('ports', 'json')
port_list = self.deserialize('json', req.get_response(self.api))
@ -721,6 +723,8 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self.assertTrue(port2['port']['id'] in ids)
def test_list_ports_filtered_by_fixed_ip(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(), self.port()) as (port1, port2):
fixed_ips = port1['port']['fixed_ips'][0]
query_params = """
@ -1749,6 +1753,26 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
pass
self.assertEquals(ctx_manager.exception.code, 400)
def test_create_2_subnets_overlapping_cidr_allowed_returns_200(self):
cidr_1 = '10.0.0.0/23'
cidr_2 = '10.0.0.0/24'
cfg.CONF.set_override('allow_overlapping_ips', True)
with contextlib.nested(self.subnet(cidr=cidr_1),
self.subnet(cidr=cidr_2)):
pass
def test_create_2_subnets_overlapping_cidr_not_allowed_returns_400(self):
cidr_1 = '10.0.0.0/23'
cidr_2 = '10.0.0.0/24'
cfg.CONF.set_override('allow_overlapping_ips', False)
with self.assertRaises(
webob.exc.HTTPClientError) as ctx_manager:
with contextlib.nested(self.subnet(cidr=cidr_1),
self.subnet(cidr=cidr_2)):
pass
self.assertEquals(ctx_manager.exception.code, 400)
def test_create_subnets_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk subnet create")

View File

@ -287,6 +287,8 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
def setUp(self):
test_config['plugin_name_v2'] = (
'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin')
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
ext_mgr = L3TestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(L3NatDBTestCase, self).setUp()