From d755f7248d324bb4c44b3efc9d200f8eb075066d Mon Sep 17 00:00:00 2001 From: Pavel Bondar Date: Tue, 20 Oct 2015 19:11:30 +0300 Subject: [PATCH] Use compare-and-swap for IpamAvailabilityRange Existing locking mechanism 'select for update' causes deadlocks with galera multi-writers. Replaced locking rows with compare-and-swap approach. Compare-and-swap verifies that row is not changed by another thread before updating/deleting it. Filter-and-update and filter-and-delete are used. They return count of affected rows. If count of affected row is less than expected, then another thread already changed our row and RetryRequest is raised. Change-Id: I514cae0fa43033433ec2982bcf3726e02e6692bf Closes-Bug: #1494351 --- neutron/ipam/drivers/neutrondb_ipam/db_api.py | 76 +++++++++++++------ neutron/ipam/drivers/neutrondb_ipam/driver.py | 69 ++++++++++------- neutron/ipam/exceptions.py | 8 ++ .../drivers/neutrondb_ipam/test_db_api.py | 61 ++++++++++++--- .../drivers/neutrondb_ipam/test_driver.py | 16 ++++ 5 files changed, 168 insertions(+), 62 deletions(-) diff --git a/neutron/ipam/drivers/neutrondb_ipam/db_api.py b/neutron/ipam/drivers/neutrondb_ipam/db_api.py index 223fb1c3484..dd73841a9ba 100644 --- a/neutron/ipam/drivers/neutrondb_ipam/db_api.py +++ b/neutron/ipam/drivers/neutrondb_ipam/db_api.py @@ -13,10 +13,13 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_db import exception as db_exc from oslo_log import log from oslo_utils import uuidutils +from sqlalchemy.orm import exc as orm_exc from neutron.ipam.drivers.neutrondb_ipam import db_models +from neutron.ipam import exceptions as ipam_exc LOG = log.getLogger(__name__) # Database operations for Neutron's DB-backed IPAM driver @@ -103,45 +106,35 @@ class IpamSubnetManager(object): db_models.IpamAllocationPool).filter_by( ipam_subnet_id=self._ipam_subnet_id) - def _range_query(self, session, locking): - range_qry = session.query( + def _range_query(self, session): + return session.query( db_models.IpamAvailabilityRange).join( db_models.IpamAllocationPool).filter_by( ipam_subnet_id=self._ipam_subnet_id) - if locking: - range_qry = range_qry.with_lockmode('update') - return range_qry - def get_first_range(self, session, locking=False): + def get_first_range(self, session): """Return the first availability range for the subnet :param session: database session - :param locking: specifies whether a write-intent lock should be - performed on the database operation :return: first available range as instance of neutron.ipam.drivers.neutrondb_ipam.db_models.IpamAvailabilityRange """ - return self._range_query(session, locking).first() + return self._range_query(session).first() - def list_ranges_by_subnet_id(self, session, locking=False): + def list_ranges_by_subnet_id(self, session): """Return availability ranges for a given ipam subnet :param session: database session - :param locking: specifies whether a write-intent lock should be - acquired with this database operation. :return: list of availability ranges as instances of neutron.ipam.drivers.neutrondb_ipam.db_models.IpamAvailabilityRange """ - return self._range_query(session, locking) + return self._range_query(session) - def list_ranges_by_allocation_pool(self, session, allocation_pool_id, - locking=False): + def list_ranges_by_allocation_pool(self, session, allocation_pool_id): """Return availability ranges for a given pool. :param session: database session :param allocation_pool_id: allocation pool identifier - :param locking: specifies whether a write-intent lock should be - acquired with this database operation. :return: list of availability ranges as instances of neutron.ipam.drivers.neutrondb_ipam.db_models.IpamAvailabilityRange """ @@ -150,6 +143,46 @@ class IpamSubnetManager(object): db_models.IpamAllocationPool).filter_by( id=allocation_pool_id) + def update_range(self, session, db_range, first_ip=None, last_ip=None): + """Updates db_range to have new first_ip and last_ip. + + :param session: database session + :param db_range: IpamAvailabilityRange db object + :param first_ip: first ip address in range + :param last_ip: last ip address in range + :return: count of updated rows + """ + opts = {} + if first_ip: + opts['first_ip'] = str(first_ip) + if last_ip: + opts['last_ip'] = str(last_ip) + if not opts: + raise ipam_exc.IpamAvailabilityRangeNoChanges() + try: + return session.query( + db_models.IpamAvailabilityRange).filter_by( + allocation_pool_id=db_range.allocation_pool_id).filter_by( + first_ip=db_range.first_ip).filter_by( + last_ip=db_range.last_ip).update(opts) + except orm_exc.ObjectDeletedError: + raise db_exc.RetryRequest(ipam_exc.IPAllocationFailed) + + def delete_range(self, session, db_range): + """Return count of deleted ranges + + :param session: database session + :param db_range: IpamAvailabilityRange db object + """ + try: + return session.query( + db_models.IpamAvailabilityRange).filter_by( + allocation_pool_id=db_range.allocation_pool_id).filter_by( + first_ip=db_range.first_ip).filter_by( + last_ip=db_range.last_ip).delete() + except orm_exc.ObjectDeletedError: + raise db_exc.RetryRequest(ipam_exc.IPAllocationFailed) + def create_range(self, session, allocation_pool_id, range_start, range_end): """Create an availabilty range for a given pool. @@ -180,23 +213,18 @@ class IpamSubnetManager(object): return False return True - def list_allocations(self, session, status='ALLOCATED', locking=False): + def list_allocations(self, session, status='ALLOCATED'): """Return current allocations for the subnet. :param session: database session :param status: IP allocation status - :param locking: specifies whether a write-intent lock should be - performed on the database operation :returns: a list of IP allocation as instance of neutron.ipam.drivers.neutrondb_ipam.db_models.IpamAllocation """ - ip_qry = session.query( + return session.query( db_models.IpamAllocation).filter_by( ipam_subnet_id=self._ipam_subnet_id, status=status) - if locking: - ip_qry = ip_qry.with_lockmode('update') - return ip_qry def create_allocation(self, session, ip_address, status='ALLOCATED'): diff --git a/neutron/ipam/drivers/neutrondb_ipam/driver.py b/neutron/ipam/drivers/neutrondb_ipam/driver.py index da2da230fd8..3a4beb1b281 100644 --- a/neutron/ipam/drivers/neutrondb_ipam/driver.py +++ b/neutron/ipam/drivers/neutrondb_ipam/driver.py @@ -14,6 +14,7 @@ # under the License. import netaddr +from oslo_db import exception as db_exc from oslo_log import log from oslo_utils import uuidutils @@ -154,7 +155,8 @@ class NeutronDbSubnet(ipam_base.Subnet): ip=ip_address) def _allocate_specific_ip(self, session, ip_address, - allocation_pool_id=None): + allocation_pool_id=None, + auto_generated=False): """Remove an IP address from subnet's availability ranges. This method is supposed to be called from within a database @@ -167,6 +169,7 @@ class NeutronDbSubnet(ipam_base.Subnet): :param allocation_pool_id: identifier of the allocation pool from which the ip address has been extracted. If not specified this routine will scan all allocation pools. + :param auto_generated: indicates whether ip was auto generated :returns: list of IP ranges as instances of IPAvailabilityRange """ # Return immediately for EUI-64 addresses. For this @@ -181,25 +184,28 @@ class NeutronDbSubnet(ipam_base.Subnet): # Netaddr's IPRange and IPSet objects work very well even with very # large subnets, including IPv6 ones. final_ranges = [] + ip_in_pools = False if allocation_pool_id: av_ranges = self.subnet_manager.list_ranges_by_allocation_pool( - session, allocation_pool_id, locking=True) + session, allocation_pool_id) else: - av_ranges = self.subnet_manager.list_ranges_by_subnet_id( - session, locking=True) + av_ranges = self.subnet_manager.list_ranges_by_subnet_id(session) for db_range in av_ranges: initial_ip_set = netaddr.IPSet(netaddr.IPRange( db_range['first_ip'], db_range['last_ip'])) final_ip_set = initial_ip_set - netaddr.IPSet([ip_address]) if not final_ip_set: + ip_in_pools = True # Range exhausted - bye bye - session.delete(db_range) + if not self.subnet_manager.delete_range(session, db_range): + raise db_exc.RetryRequest(ipam_exc.IPAllocationFailed) continue if initial_ip_set == final_ip_set: # IP address does not fall within the current range, move # to the next one final_ranges.append(db_range) continue + ip_in_pools = True for new_range in final_ip_set.iter_ipranges(): # store new range in database # use netaddr.IPAddress format() method which is equivalent @@ -208,9 +214,11 @@ class NeutronDbSubnet(ipam_base.Subnet): first_ip = netaddr.IPAddress(new_range.first) last_ip = netaddr.IPAddress(new_range.last) if (db_range['first_ip'] == first_ip.format() or - db_range['last_ip'] == last_ip.format()): - db_range['first_ip'] = first_ip.format() - db_range['last_ip'] = last_ip.format() + db_range['last_ip'] == last_ip.format()): + rows = self.subnet_manager.update_range( + session, db_range, first_ip=first_ip, last_ip=last_ip) + if not rows: + raise db_exc.RetryRequest(ipam_exc.IPAllocationFailed) LOG.debug("Adjusted availability range for pool %s", db_range['allocation_pool_id']) final_ranges.append(db_range) @@ -223,6 +231,11 @@ class NeutronDbSubnet(ipam_base.Subnet): LOG.debug("Created availability range for pool %s", new_ip_range['allocation_pool_id']) final_ranges.append(new_ip_range) + + # If ip is autogenerated it should be present in allocation pools, + # so retry if it is not there + if auto_generated and not ip_in_pools: + raise db_exc.RetryRequest(ipam_exc.IPAllocationFailed) # Most callers might ignore this return value, which is however # useful for testing purposes LOG.debug("Availability ranges for subnet id %(subnet_id)s " @@ -258,7 +271,7 @@ class NeutronDbSubnet(ipam_base.Subnet): allocations = netaddr.IPSet( [netaddr.IPAddress(allocation['ip_address']) for allocation in self.subnet_manager.list_allocations( - session, locking=True)]) + session)]) # MEH MEH # There should be no need to set a write intent lock on the allocation @@ -296,7 +309,7 @@ class NeutronDbSubnet(ipam_base.Subnet): def _try_generate_ip(self, session): """Generate an IP address from availability ranges.""" - ip_range = self.subnet_manager.get_first_range(session, locking=True) + ip_range = self.subnet_manager.get_first_range(session) if not ip_range: LOG.debug("All IPs from subnet %(subnet_id)s allocated", {'subnet_id': self.subnet_manager.neutron_id}) @@ -320,22 +333,26 @@ class NeutronDbSubnet(ipam_base.Subnet): # with remote backends session = self._context.session all_pool_id = None - # NOTE(salv-orlando): It would probably better to have a simpler - # model for address requests and just check whether there is a - # specific IP address specified in address_request - if isinstance(address_request, ipam_req.SpecificAddressRequest): - # This handles both specific and automatic address requests - # Check availability of requested IP - ip_address = str(address_request.address) - self._verify_ip(session, ip_address) - else: - ip_address, all_pool_id = self._generate_ip(session) - self._allocate_specific_ip(session, ip_address, all_pool_id) - # Create IP allocation request object - # The only defined status at this stage is 'ALLOCATED'. - # More states will be available in the future - e.g.: RECYCLABLE - self.subnet_manager.create_allocation(session, ip_address) - return ip_address + auto_generated = False + with db_api.autonested_transaction(session): + # NOTE(salv-orlando): It would probably better to have a simpler + # model for address requests and just check whether there is a + # specific IP address specified in address_request + if isinstance(address_request, ipam_req.SpecificAddressRequest): + # This handles both specific and automatic address requests + # Check availability of requested IP + ip_address = str(address_request.address) + self._verify_ip(session, ip_address) + else: + ip_address, all_pool_id = self._generate_ip(session) + auto_generated = True + self._allocate_specific_ip(session, ip_address, all_pool_id, + auto_generated) + # Create IP allocation request object + # The only defined status at this stage is 'ALLOCATED'. + # More states will be available in the future - e.g.: RECYCLABLE + self.subnet_manager.create_allocation(session, ip_address) + return ip_address def deallocate(self, address): # This is almost a no-op because the Neutron DB IPAM driver does not diff --git a/neutron/ipam/exceptions.py b/neutron/ipam/exceptions.py index 8170f2ac2b8..185e69784ba 100644 --- a/neutron/ipam/exceptions.py +++ b/neutron/ipam/exceptions.py @@ -60,3 +60,11 @@ class AllocationOnAutoAddressSubnet(exceptions.NeutronException): class IpAddressGenerationFailure(exceptions.Conflict): message = _("No more IP addresses available for subnet %(subnet_id)s.") + + +class IPAllocationFailed(exceptions.NeutronException): + message = _("IP allocation failed. Try again later.") + + +class IpamAvailabilityRangeNoChanges(exceptions.NeutronException): + message = _("New value for first_ip or last_ip has to be specified.") diff --git a/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_db_api.py b/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_db_api.py index fee27922b79..32b4a6a3df1 100644 --- a/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_db_api.py +++ b/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_db_api.py @@ -13,11 +13,16 @@ # License for the specific language governing permissions and limitations # under the License. +import mock + +from oslo_db import exception as db_exc from oslo_utils import uuidutils +from sqlalchemy.orm import exc as orm_exc from neutron import context from neutron.ipam.drivers.neutrondb_ipam import db_api from neutron.ipam.drivers.neutrondb_ipam import db_models +from neutron.ipam import exceptions as ipam_exc from neutron.tests.unit import testlib_api @@ -80,24 +85,16 @@ class TestIpamSubnetManager(testlib_api.SqlTestCase): filter_by(allocation_pool_id=db_pools[0].id).first() self._validate_ips([self.single_pool], range) - def _test_get_first_range(self, locking): - self._create_pools(self.multi_pool) - range = self.subnet_manager.get_first_range(self.ctx.session, - locking=locking) - self._validate_ips(self.multi_pool, range) - def test_get_first_range(self): - self._test_get_first_range(False) - - def test_get_first_range_locking(self): - self._test_get_first_range(True) + self._create_pools(self.multi_pool) + range = self.subnet_manager.get_first_range(self.ctx.session) + self._validate_ips(self.multi_pool, range) def test_list_ranges_by_subnet_id(self): self._create_pools(self.multi_pool) db_ranges = self.subnet_manager.list_ranges_by_subnet_id( - self.ctx.session, - self.ipam_subnet_id).all() + self.ctx.session).all() self.assertEqual(2, len(db_ranges)) self.assertEqual(db_models.IpamAvailabilityRange, type(db_ranges[0])) @@ -136,6 +133,46 @@ class TestIpamSubnetManager(testlib_api.SqlTestCase): self.assertEqual(range_start, new_range.first_ip) self.assertEqual(range_end, new_range.last_ip) + def test_update_range(self): + self._create_pools([self.single_pool]) + db_range = self.subnet_manager.get_first_range(self.ctx.session) + updated_count = self.subnet_manager.update_range(self.ctx.session, + db_range, + first_ip='1.2.3.6', + last_ip='1.2.3.8') + self.assertEqual(1, updated_count) + + def test_update_range_no_new_values(self): + self._create_pools([self.single_pool]) + db_range = self.subnet_manager.get_first_range(self.ctx.session) + self.assertRaises(ipam_exc.IpamAvailabilityRangeNoChanges, + self.subnet_manager.update_range, + self.ctx.session, db_range) + + def test_update_range_reraise_error(self): + session = mock.Mock() + session.query.side_effect = orm_exc.ObjectDeletedError(None, None) + self.assertRaises(db_exc.RetryRequest, + self.subnet_manager.update_range, + session, + mock.Mock(), + first_ip='1.2.3.5') + + def test_delete_range(self): + self._create_pools([self.single_pool]) + db_range = self.subnet_manager.get_first_range(self.ctx.session) + deleted_count = self.subnet_manager.delete_range(self.ctx.session, + db_range) + self.assertEqual(1, deleted_count) + + def test_delete_range_reraise_error(self): + session = mock.Mock() + session.query.side_effect = orm_exc.ObjectDeletedError(None, None) + self.assertRaises(db_exc.RetryRequest, + self.subnet_manager.delete_range, + session, + mock.Mock()) + def test_check_unique_allocation(self): self.assertTrue(self.subnet_manager.check_unique_allocation( self.ctx.session, self.subnet_ip)) diff --git a/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py b/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py index 5a3f6d6e9cb..4bb43f9ee8b 100644 --- a/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py +++ b/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py @@ -13,8 +13,11 @@ # License for the specific language governing permissions and limitations # under the License. +import mock import netaddr +from oslo_db import exception as db_exc + from neutron.api.v2 import attributes from neutron.common import constants from neutron.common import exceptions as n_exc @@ -444,3 +447,16 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase, subnet_req = ipam_req.SpecificSubnetRequest( 'tenant_id', 'meh', '192.168.0.0/24') self.ipam_pool.allocate_subnet(subnet_req) + + def test__allocate_specific_ip_raises_exception(self): + cidr = '10.0.0.0/24' + ip = '10.0.0.15' + ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0] + ipam_subnet.subnet_manager = mock.Mock() + ipam_subnet.subnet_manager.list_ranges_by_subnet_id.return_value = [{ + 'first_ip': '10.0.0.15', 'last_ip': '10.0.0.15'}] + ipam_subnet.subnet_manager.delete_range.return_value = 0 + + self.assertRaises(db_exc.RetryRequest, + ipam_subnet._allocate_specific_ip, + self.ctx.session, ip)