Integration of IPAllocationPool

This patch integrates the Oslo-Versioned Object created for
IPAllocationPool model class.

Change-Id: Ifca5be29d322f1e85c45ca77371df37c9bf5b27f
Partially-Implements: blueprint adopt-oslo-versioned-objects-for-db
This commit is contained in:
Victor Morales 2016-11-11 11:30:08 -06:00 committed by Ihar Hrachyshka
parent d20c0edfc6
commit daf540781e
4 changed files with 43 additions and 25 deletions

View File

@ -165,16 +165,14 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
@db_api.context_manager.writer
def _update_subnet_allocation_pools(self, context, subnet_id, s):
context.session.query(models_v2.IPAllocationPool).filter_by(
subnet_id=subnet_id).delete()
subnet_obj.IPAllocationPool.delete_objects(context,
subnet_id=subnet_id)
pools = [(netaddr.IPAddress(p.first, p.version).format(),
netaddr.IPAddress(p.last, p.version).format())
for p in s['allocation_pools']]
new_pools = [models_v2.IPAllocationPool(first_ip=p[0],
last_ip=p[1],
subnet_id=subnet_id)
for p in pools]
context.session.add_all(new_pools)
for p in pools:
subnet_obj.IPAllocationPool(context, start=p[0], end=p[1],
subnet_id=subnet_id).create()
# Gather new pools for result
result_pools = [{'start': p[0], 'end': p[1]} for p in pools]

View File

@ -32,6 +32,7 @@ from neutron.db import ipam_backend_mixin
from neutron.db import models_v2
from neutron.ipam import driver
from neutron.ipam import exceptions as ipam_exc
from neutron.objects import subnet as obj_subnet
LOG = logging.getLogger(__name__)
@ -327,10 +328,9 @@ class IpamPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
for pool in allocation_pools:
first_ip = str(netaddr.IPAddress(pool.first, pool.version))
last_ip = str(netaddr.IPAddress(pool.last, pool.version))
ip_pool = models_v2.IPAllocationPool(subnet=subnet,
first_ip=first_ip,
last_ip=last_ip)
context.session.add(ip_pool)
obj_subnet.IPAllocationPool(
context, subnet_id=subnet['id'], start=first_ip,
end=last_ip).create()
def update_port_with_ips(self, context, host, db_port, new_port, new_mac):
changes = self.Changes(add=[], original=[], remove=[])

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron_lib import constants
from neutron_lib import context
from neutron_lib import exceptions as n_exc
@ -22,6 +23,7 @@ import testtools
from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.db import models_v2
from neutron.objects import subnet as subnet_obj
from neutron.tests.unit import testlib_api
@ -68,8 +70,8 @@ class IpamTestCase(testlib_api.SqlTestCase):
self.assertEqual(expected, actual)
def assert_ip_alloc_pool_matches(self, expected):
result_set = self.cxt.session.query(models_v2.IPAllocationPool).all()
keys = ['first_ip', 'last_ip', 'subnet_id']
result_set = subnet_obj.IPAllocationPool.get_objects(self.cxt)
keys = ['start', 'end', 'subnet_id']
actual = self.result_set_to_dicts(result_set, keys)
self.assertEqual(expected, actual)
@ -120,8 +122,8 @@ class IpamTestCase(testlib_api.SqlTestCase):
'ip_address': fixed_ip[0].get('ip_address'),
'subnet_id': self.subnet_id,
'network_id': self.network_id}]
ip_alloc_pool_expected = [{'first_ip': '10.10.10.2',
'last_ip': '10.10.10.6',
ip_alloc_pool_expected = [{'start': netaddr.IPAddress('10.10.10.2'),
'end': netaddr.IPAddress('10.10.10.6'),
'subnet_id': self.subnet_id}]
self.assert_ip_alloc_matches(ip_alloc_expected)
self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected)
@ -131,8 +133,8 @@ class IpamTestCase(testlib_api.SqlTestCase):
for i in range(1, 6):
self._create_port(uuidutils.generate_uuid())
ip_alloc_pool_expected = [{'first_ip': '10.10.10.2',
'last_ip': '10.10.10.6',
ip_alloc_pool_expected = [{'start': netaddr.IPAddress('10.10.10.2'),
'end': netaddr.IPAddress('10.10.10.6'),
'subnet_id': self.subnet_id}]
self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected)
with testtools.ExpectedException(n_exc.IpAddressGenerationFailure):

View File

@ -17,6 +17,7 @@ import copy
import mock
import netaddr
from neutron_lib import constants
from neutron_lib import context as ncontext
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
from oslo_db import exception as db_exc
@ -29,6 +30,7 @@ from neutron.db import ipam_backend_mixin
from neutron.db import ipam_pluggable_backend
from neutron.db import models_v2
from neutron.ipam import requests as ipam_req
from neutron.objects import subnet as obj_subnet
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_db_base
@ -65,6 +67,7 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
super(TestDbBasePluginIpam, self).setUp()
self.tenant_id = uuidutils.generate_uuid()
self.subnet_id = uuidutils.generate_uuid()
self.admin_context = ncontext.get_admin_context()
def _prepare_mocks(self, address_factory=None, subnet_factory=None):
if address_factory is None:
@ -710,13 +713,14 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
def _test_update_db_subnet(self, pool_mock, subnet, expected_subnet,
old_pools):
subnet_factory = mock.Mock()
context = mock.Mock()
context = self.admin_context
mocks = self._prepare_mocks_with_pool_mock(
pool_mock, subnet_factory=subnet_factory)
mocks['ipam'] = ipam_pluggable_backend.IpamPluggableBackend()
mocks['ipam'].update_db_subnet(context, id, subnet, old_pools)
mocks['ipam'].update_db_subnet(
context, subnet['id'], subnet, old_pools)
mocks['driver'].get_subnet_request_factory.assert_called_once_with()
subnet_factory.get_request.assert_called_once_with(context,
@ -725,13 +729,17 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
@mock.patch('neutron.ipam.driver.Pool')
def test_update_db_subnet_unchanged_pools(self, pool_mock):
old_pools = [netaddr.IPRange('192.1.1.2', '192.1.1.254')]
old_pools = [{'start': '192.1.1.2', 'end': '192.1.1.254'}]
context = self.admin_context
subnet = {'id': uuidutils.generate_uuid(),
'network_id': uuidutils.generate_uuid(),
'ip_version': '4',
'cidr': '192.1.1.0/24',
'ipv6_address_mode': None,
'ipv6_ra_mode': None}
subnet_with_pools = subnet.copy()
subnet_db = models_v2.Subnet(**subnet_with_pools)
context.session.add(subnet_db)
context.session.flush()
subnet_with_pools['allocation_pools'] = old_pools
# if subnet has no allocation pools set, then old pools has to
# be added to subnet dict passed to request factory
@ -740,17 +748,27 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
@mock.patch('neutron.ipam.driver.Pool')
def test_update_db_subnet_new_pools(self, pool_mock):
old_pools = [netaddr.IPRange('192.1.1.2', '192.1.1.254')]
old_pools = [{'start': '192.1.1.2', 'end': '192.1.1.254'}]
context = self.admin_context
subnet = {'id': uuidutils.generate_uuid(),
'network_id': uuidutils.generate_uuid(),
'ip_version': '4',
'cidr': '192.1.1.0/24',
'allocation_pools': [
netaddr.IPRange('192.1.1.10', '192.1.1.254')],
'ipv6_address_mode': None,
'ipv6_ra_mode': None}
# make a copy of subnet for validation, since update_subnet changes
# incoming subnet dict
expected_subnet = subnet.copy()
subnet_db = models_v2.Subnet(**subnet)
context.session.add(subnet_db)
context.session.flush()
subnet['allocation_pools'] = [
netaddr.IPRange('192.1.1.10', '192.1.1.254')]
expected_subnet = subnet.copy()
obj_subnet.IPAllocationPool(context,
subnet_id=subnet['id'],
start='192.1.1.10',
end='192.1.1.254').create()
context.session.refresh(subnet_db)
# validate that subnet passed to request factory is the same as
# incoming one, i.e. new pools in it are not overwritten by old pools
self._test_update_db_subnet(pool_mock, subnet, expected_subnet,