diff --git a/neutron/plugins/ml2/drivers/type_gre.py b/neutron/plugins/ml2/drivers/type_gre.py index 5db7074c73c..53b907c884c 100644 --- a/neutron/plugins/ml2/drivers/type_gre.py +++ b/neutron/plugins/ml2/drivers/type_gre.py @@ -14,16 +14,13 @@ # under the License. from oslo_config import cfg -from oslo_db import exception as db_exc from oslo_log import log -from six import moves import sqlalchemy as sa from sqlalchemy import sql from neutron.common import exceptions as n_exc -from neutron.db import api as db_api from neutron.db import model_base -from neutron.i18n import _LE, _LW +from neutron.i18n import _LE from neutron.plugins.common import constants as p_const from neutron.plugins.ml2.drivers import type_tunnel @@ -83,44 +80,6 @@ class GreTypeDriver(type_tunnel.EndpointTunnelTypeDriver): "Service terminated!")) raise SystemExit() - def sync_allocations(self): - - # determine current configured allocatable gres - gre_ids = set() - for gre_id_range in self.tunnel_ranges: - tun_min, tun_max = gre_id_range - gre_ids |= set(moves.range(tun_min, tun_max + 1)) - - session = db_api.get_session() - try: - self._add_allocation(session, gre_ids) - except db_exc.DBDuplicateEntry: - # in case multiple neutron-servers start allocations could be - # already added by different neutron-server. because this function - # is called only when initializing this type driver, it's safe to - # assume allocations were added. - LOG.warning(_LW("Gre allocations were already created.")) - - def _add_allocation(self, session, gre_ids): - with session.begin(subtransactions=True): - # remove from table unallocated tunnels not currently allocatable - allocs = (session.query(GreAllocation).all()) - for alloc in allocs: - try: - # see if tunnel is allocatable - gre_ids.remove(alloc.gre_id) - except KeyError: - # it's not allocatable, so check if its allocated - if not alloc.allocated: - # it's not, so remove it from table - LOG.debug("Removing tunnel %s from pool", alloc.gre_id) - session.delete(alloc) - - # add missing allocatable tunnels to table - for gre_id in sorted(gre_ids): - alloc = GreAllocation(gre_id=gre_id) - session.add(alloc) - def get_endpoints(self): """Get every gre endpoints from database.""" gre_endpoints = self._get_endpoints() diff --git a/neutron/plugins/ml2/drivers/type_tunnel.py b/neutron/plugins/ml2/drivers/type_tunnel.py index 258e78c2644..054d546801d 100644 --- a/neutron/plugins/ml2/drivers/type_tunnel.py +++ b/neutron/plugins/ml2/drivers/type_tunnel.py @@ -13,10 +13,13 @@ # License for the specific language governing permissions and limitations # under the License. import abc +import itertools +import operator from oslo_config import cfg from oslo_db import exception as db_exc from oslo_log import log +from six import moves from neutron.common import exceptions as exc from neutron.common import topics @@ -31,21 +34,27 @@ LOG = log.getLogger(__name__) TUNNEL = 'tunnel' +def chunks(iterable, chunk_size): + """Chunks data into chunk with size<=chunk_size.""" + iterator = iter(iterable) + chunk = list(itertools.islice(iterator, 0, chunk_size)) + while chunk: + yield chunk + chunk = list(itertools.islice(iterator, 0, chunk_size)) + + class TunnelTypeDriver(helpers.SegmentTypeDriver): """Define stable abstract interface for ML2 type drivers. tunnel type networks rely on tunnel endpoints. This class defines abstract methods to manage these endpoints. """ + BULK_SIZE = 100 def __init__(self, model): super(TunnelTypeDriver, self).__init__(model) self.segmentation_key = next(iter(self.primary_keys)) - @abc.abstractmethod - def sync_allocations(self): - """Synchronize type_driver allocation table with configured ranges.""" - @abc.abstractmethod def add_endpoint(self, ip, host): """Register the endpoint in the type_driver database. @@ -113,6 +122,40 @@ class TunnelTypeDriver(helpers.SegmentTypeDriver): LOG.info(_LI("%(type)s ID ranges: %(range)s"), {'type': self.get_type(), 'range': current_range}) + def sync_allocations(self): + # determine current configured allocatable tunnel ids + tunnel_ids = set() + for tun_min, tun_max in self.tunnel_ranges: + tunnel_ids |= set(moves.range(tun_min, tun_max + 1)) + + tunnel_id_getter = operator.attrgetter(self.segmentation_key) + tunnel_col = getattr(self.model, self.segmentation_key) + session = db_api.get_session() + with session.begin(subtransactions=True): + # remove from table unallocated tunnels not currently allocatable + # fetch results as list via all() because we'll be iterating + # through them twice + allocs = (session.query(self.model). + with_lockmode("update").all()) + + # collect those vnis that needs to be deleted from db + unallocateds = ( + tunnel_id_getter(a) for a in allocs if not a.allocated) + to_remove = (x for x in unallocateds if x not in tunnel_ids) + # Immediately delete tunnels in chunks. This leaves no work for + # flush at the end of transaction + for chunk in chunks(to_remove, self.BULK_SIZE): + session.query(self.model).filter( + tunnel_col.in_(chunk)).delete(synchronize_session=False) + + # collect vnis that need to be added + existings = {tunnel_id_getter(a) for a in allocs} + missings = list(tunnel_ids - existings) + for chunk in chunks(missings, self.BULK_SIZE): + bulk = [{self.segmentation_key: x, 'allocated': False} + for x in chunk] + session.execute(self.model.__table__.insert(), bulk) + def is_partial_segment(self, segment): return segment.get(api.SEGMENTATION_ID) is None diff --git a/neutron/plugins/ml2/drivers/type_vxlan.py b/neutron/plugins/ml2/drivers/type_vxlan.py index 52e5f7eaee7..c6f9dbf1073 100644 --- a/neutron/plugins/ml2/drivers/type_vxlan.py +++ b/neutron/plugins/ml2/drivers/type_vxlan.py @@ -15,12 +15,10 @@ from oslo_config import cfg from oslo_log import log -from six import moves import sqlalchemy as sa from sqlalchemy import sql from neutron.common import exceptions as n_exc -from neutron.db import api as db_api from neutron.db import model_base from neutron.i18n import _LE from neutron.plugins.common import constants as p_const @@ -86,45 +84,6 @@ class VxlanTypeDriver(type_tunnel.EndpointTunnelTypeDriver): "Service terminated!")) raise SystemExit() - def sync_allocations(self): - - # determine current configured allocatable vnis - vxlan_vnis = set() - for tun_min, tun_max in self.tunnel_ranges: - vxlan_vnis |= set(moves.range(tun_min, tun_max + 1)) - - session = db_api.get_session() - with session.begin(subtransactions=True): - # remove from table unallocated tunnels not currently allocatable - # fetch results as list via all() because we'll be iterating - # through them twice - allocs = (session.query(VxlanAllocation). - with_lockmode("update").all()) - # collect all vnis present in db - existing_vnis = set(alloc.vxlan_vni for alloc in allocs) - # collect those vnis that needs to be deleted from db - vnis_to_remove = [alloc.vxlan_vni for alloc in allocs - if (alloc.vxlan_vni not in vxlan_vnis and - not alloc.allocated)] - # Immediately delete vnis in chunks. This leaves no work for - # flush at the end of transaction - bulk_size = 100 - chunked_vnis = (vnis_to_remove[i:i + bulk_size] for i in - range(0, len(vnis_to_remove), bulk_size)) - for vni_list in chunked_vnis: - if vni_list: - session.query(VxlanAllocation).filter( - VxlanAllocation.vxlan_vni.in_(vni_list)).delete( - synchronize_session=False) - # collect vnis that need to be added - vnis = list(vxlan_vnis - existing_vnis) - chunked_vnis = (vnis[i:i + bulk_size] for i in - range(0, len(vnis), bulk_size)) - for vni_list in chunked_vnis: - bulk = [{'vxlan_vni': vni, 'allocated': False} - for vni in vni_list] - session.execute(VxlanAllocation.__table__.insert(), bulk) - def get_endpoints(self): """Get every vxlan endpoints from database.""" vxlan_endpoints = self._get_endpoints() diff --git a/neutron/tests/unit/plugins/ml2/drivers/base_type_tunnel.py b/neutron/tests/unit/plugins/ml2/drivers/base_type_tunnel.py index 725fdaab18e..5bbb3ec38dc 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/base_type_tunnel.py +++ b/neutron/tests/unit/plugins/ml2/drivers/base_type_tunnel.py @@ -93,6 +93,35 @@ class TunnelTypeTestMixin(object): self.assertIsNone( self.driver.get_allocation(self.session, (TUN_MAX + 5 + 1))) + def _test_sync_allocations_and_allocated(self, tunnel_id): + segment = {api.NETWORK_TYPE: self.TYPE, + api.PHYSICAL_NETWORK: None, + api.SEGMENTATION_ID: tunnel_id} + self.driver.reserve_provider_segment(self.session, segment) + + self.driver.tunnel_ranges = UPDATED_TUNNEL_RANGES + self.driver.sync_allocations() + + self.assertTrue( + self.driver.get_allocation(self.session, tunnel_id).allocated) + + def test_sync_allocations_and_allocated_in_initial_range(self): + self._test_sync_allocations_and_allocated(TUN_MIN + 2) + + def test_sync_allocations_and_allocated_in_final_range(self): + self._test_sync_allocations_and_allocated(TUN_MAX + 2) + + def test_sync_allocations_no_op(self): + + def verify_no_chunk(iterable, chunk_size): + # no segment removed/added + self.assertEqual(0, len(list(iterable))) + return [] + with mock.patch.object( + type_tunnel, 'chunks', side_effect=verify_no_chunk) as chunks: + self.driver.sync_allocations() + self.assertEqual(2, len(chunks.mock_calls)) + def test_partial_segment_is_partial_segment(self): segment = {api.NETWORK_TYPE: self.TYPE, api.PHYSICAL_NETWORK: None, diff --git a/neutron/tests/unit/plugins/ml2/drivers/test_type_gre.py b/neutron/tests/unit/plugins/ml2/drivers/test_type_gre.py index ec4d342012b..0471c68ec41 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/test_type_gre.py +++ b/neutron/tests/unit/plugins/ml2/drivers/test_type_gre.py @@ -13,13 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import mock - -from oslo_db import exception as db_exc -from sqlalchemy.orm import exc as sa_exc -import testtools - -from neutron.db import api as db_api from neutron.plugins.common import constants as p_const from neutron.plugins.ml2 import config from neutron.plugins.ml2.drivers import type_gre @@ -62,32 +55,6 @@ class GreTypeTest(base_type_tunnel.TunnelTypeTestMixin, elif endpoint['ip_address'] == base_type_tunnel.TUNNEL_IP_TWO: self.assertEqual(base_type_tunnel.HOST_TWO, endpoint['host']) - def test_sync_allocations_entry_added_during_session(self): - with mock.patch.object(self.driver, '_add_allocation', - side_effect=db_exc.DBDuplicateEntry) as ( - mock_add_allocation): - self.driver.sync_allocations() - self.assertTrue(mock_add_allocation.called) - - def test__add_allocation_not_existing(self): - session = db_api.get_session() - _add_allocation(session, gre_id=1) - self.driver._add_allocation(session, {1, 2}) - _get_allocation(session, 2) - - def test__add_allocation_existing_allocated_is_kept(self): - session = db_api.get_session() - _add_allocation(session, gre_id=1, allocated=True) - self.driver._add_allocation(session, {2}) - _get_allocation(session, 1) - - def test__add_allocation_existing_not_allocated_is_removed(self): - session = db_api.get_session() - _add_allocation(session, gre_id=1) - self.driver._add_allocation(session, {2}) - with testtools.ExpectedException(sa_exc.NoResultFound): - _get_allocation(session, 1) - def test_get_mtu(self): config.cfg.CONF.set_override('segment_mtu', 1500, group='ml2') config.cfg.CONF.set_override('path_mtu', 1475, group='ml2')