Abstract sync_allocations

Currently gre/vxlan type drivers have specific sync_allocations
implementations to perform the same feature. This change abstracts
(vxlan) implementation and moves it to TunnelTypeDriver[1] in order to
share implementation between gre/vxlan; this change also adds some
unittests.

Current gre/vxlan and new implementations of sync_allocations are not
Galera-compliant with multi-writers, a follow-up will update it to
fully support Galera.

[1] neutron.plugins.ml2.drivers.type_tunnel

Change-Id: I188a7cf718d811084475f6783d844588de5d60ea
This commit is contained in:
Cedric Brandily 2015-06-15 22:20:16 +02:00
parent a5bf502fab
commit 7c273fe218
5 changed files with 77 additions and 120 deletions

View File

@ -14,16 +14,13 @@
# under the License.
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log
from six import moves
import sqlalchemy as sa
from sqlalchemy import sql
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import model_base
from neutron.i18n import _LE, _LW
from neutron.i18n import _LE
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_tunnel
@ -83,44 +80,6 @@ class GreTypeDriver(type_tunnel.EndpointTunnelTypeDriver):
"Service terminated!"))
raise SystemExit()
def sync_allocations(self):
# determine current configured allocatable gres
gre_ids = set()
for gre_id_range in self.tunnel_ranges:
tun_min, tun_max = gre_id_range
gre_ids |= set(moves.range(tun_min, tun_max + 1))
session = db_api.get_session()
try:
self._add_allocation(session, gre_ids)
except db_exc.DBDuplicateEntry:
# in case multiple neutron-servers start allocations could be
# already added by different neutron-server. because this function
# is called only when initializing this type driver, it's safe to
# assume allocations were added.
LOG.warning(_LW("Gre allocations were already created."))
def _add_allocation(self, session, gre_ids):
with session.begin(subtransactions=True):
# remove from table unallocated tunnels not currently allocatable
allocs = (session.query(GreAllocation).all())
for alloc in allocs:
try:
# see if tunnel is allocatable
gre_ids.remove(alloc.gre_id)
except KeyError:
# it's not allocatable, so check if its allocated
if not alloc.allocated:
# it's not, so remove it from table
LOG.debug("Removing tunnel %s from pool", alloc.gre_id)
session.delete(alloc)
# add missing allocatable tunnels to table
for gre_id in sorted(gre_ids):
alloc = GreAllocation(gre_id=gre_id)
session.add(alloc)
def get_endpoints(self):
"""Get every gre endpoints from database."""
gre_endpoints = self._get_endpoints()

View File

@ -13,10 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import itertools
import operator
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log
from six import moves
from neutron.common import exceptions as exc
from neutron.common import topics
@ -31,21 +34,27 @@ LOG = log.getLogger(__name__)
TUNNEL = 'tunnel'
def chunks(iterable, chunk_size):
"""Chunks data into chunk with size<=chunk_size."""
iterator = iter(iterable)
chunk = list(itertools.islice(iterator, 0, chunk_size))
while chunk:
yield chunk
chunk = list(itertools.islice(iterator, 0, chunk_size))
class TunnelTypeDriver(helpers.SegmentTypeDriver):
"""Define stable abstract interface for ML2 type drivers.
tunnel type networks rely on tunnel endpoints. This class defines abstract
methods to manage these endpoints.
"""
BULK_SIZE = 100
def __init__(self, model):
super(TunnelTypeDriver, self).__init__(model)
self.segmentation_key = next(iter(self.primary_keys))
@abc.abstractmethod
def sync_allocations(self):
"""Synchronize type_driver allocation table with configured ranges."""
@abc.abstractmethod
def add_endpoint(self, ip, host):
"""Register the endpoint in the type_driver database.
@ -113,6 +122,40 @@ class TunnelTypeDriver(helpers.SegmentTypeDriver):
LOG.info(_LI("%(type)s ID ranges: %(range)s"),
{'type': self.get_type(), 'range': current_range})
def sync_allocations(self):
# determine current configured allocatable tunnel ids
tunnel_ids = set()
for tun_min, tun_max in self.tunnel_ranges:
tunnel_ids |= set(moves.range(tun_min, tun_max + 1))
tunnel_id_getter = operator.attrgetter(self.segmentation_key)
tunnel_col = getattr(self.model, self.segmentation_key)
session = db_api.get_session()
with session.begin(subtransactions=True):
# remove from table unallocated tunnels not currently allocatable
# fetch results as list via all() because we'll be iterating
# through them twice
allocs = (session.query(self.model).
with_lockmode("update").all())
# collect those vnis that needs to be deleted from db
unallocateds = (
tunnel_id_getter(a) for a in allocs if not a.allocated)
to_remove = (x for x in unallocateds if x not in tunnel_ids)
# Immediately delete tunnels in chunks. This leaves no work for
# flush at the end of transaction
for chunk in chunks(to_remove, self.BULK_SIZE):
session.query(self.model).filter(
tunnel_col.in_(chunk)).delete(synchronize_session=False)
# collect vnis that need to be added
existings = {tunnel_id_getter(a) for a in allocs}
missings = list(tunnel_ids - existings)
for chunk in chunks(missings, self.BULK_SIZE):
bulk = [{self.segmentation_key: x, 'allocated': False}
for x in chunk]
session.execute(self.model.__table__.insert(), bulk)
def is_partial_segment(self, segment):
return segment.get(api.SEGMENTATION_ID) is None

View File

@ -15,12 +15,10 @@
from oslo_config import cfg
from oslo_log import log
from six import moves
import sqlalchemy as sa
from sqlalchemy import sql
from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import model_base
from neutron.i18n import _LE
from neutron.plugins.common import constants as p_const
@ -86,45 +84,6 @@ class VxlanTypeDriver(type_tunnel.EndpointTunnelTypeDriver):
"Service terminated!"))
raise SystemExit()
def sync_allocations(self):
# determine current configured allocatable vnis
vxlan_vnis = set()
for tun_min, tun_max in self.tunnel_ranges:
vxlan_vnis |= set(moves.range(tun_min, tun_max + 1))
session = db_api.get_session()
with session.begin(subtransactions=True):
# remove from table unallocated tunnels not currently allocatable
# fetch results as list via all() because we'll be iterating
# through them twice
allocs = (session.query(VxlanAllocation).
with_lockmode("update").all())
# collect all vnis present in db
existing_vnis = set(alloc.vxlan_vni for alloc in allocs)
# collect those vnis that needs to be deleted from db
vnis_to_remove = [alloc.vxlan_vni for alloc in allocs
if (alloc.vxlan_vni not in vxlan_vnis and
not alloc.allocated)]
# Immediately delete vnis in chunks. This leaves no work for
# flush at the end of transaction
bulk_size = 100
chunked_vnis = (vnis_to_remove[i:i + bulk_size] for i in
range(0, len(vnis_to_remove), bulk_size))
for vni_list in chunked_vnis:
if vni_list:
session.query(VxlanAllocation).filter(
VxlanAllocation.vxlan_vni.in_(vni_list)).delete(
synchronize_session=False)
# collect vnis that need to be added
vnis = list(vxlan_vnis - existing_vnis)
chunked_vnis = (vnis[i:i + bulk_size] for i in
range(0, len(vnis), bulk_size))
for vni_list in chunked_vnis:
bulk = [{'vxlan_vni': vni, 'allocated': False}
for vni in vni_list]
session.execute(VxlanAllocation.__table__.insert(), bulk)
def get_endpoints(self):
"""Get every vxlan endpoints from database."""
vxlan_endpoints = self._get_endpoints()

View File

@ -93,6 +93,35 @@ class TunnelTypeTestMixin(object):
self.assertIsNone(
self.driver.get_allocation(self.session, (TUN_MAX + 5 + 1)))
def _test_sync_allocations_and_allocated(self, tunnel_id):
segment = {api.NETWORK_TYPE: self.TYPE,
api.PHYSICAL_NETWORK: None,
api.SEGMENTATION_ID: tunnel_id}
self.driver.reserve_provider_segment(self.session, segment)
self.driver.tunnel_ranges = UPDATED_TUNNEL_RANGES
self.driver.sync_allocations()
self.assertTrue(
self.driver.get_allocation(self.session, tunnel_id).allocated)
def test_sync_allocations_and_allocated_in_initial_range(self):
self._test_sync_allocations_and_allocated(TUN_MIN + 2)
def test_sync_allocations_and_allocated_in_final_range(self):
self._test_sync_allocations_and_allocated(TUN_MAX + 2)
def test_sync_allocations_no_op(self):
def verify_no_chunk(iterable, chunk_size):
# no segment removed/added
self.assertEqual(0, len(list(iterable)))
return []
with mock.patch.object(
type_tunnel, 'chunks', side_effect=verify_no_chunk) as chunks:
self.driver.sync_allocations()
self.assertEqual(2, len(chunks.mock_calls))
def test_partial_segment_is_partial_segment(self):
segment = {api.NETWORK_TYPE: self.TYPE,
api.PHYSICAL_NETWORK: None,

View File

@ -13,13 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_db import exception as db_exc
from sqlalchemy.orm import exc as sa_exc
import testtools
from neutron.db import api as db_api
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import config
from neutron.plugins.ml2.drivers import type_gre
@ -62,32 +55,6 @@ class GreTypeTest(base_type_tunnel.TunnelTypeTestMixin,
elif endpoint['ip_address'] == base_type_tunnel.TUNNEL_IP_TWO:
self.assertEqual(base_type_tunnel.HOST_TWO, endpoint['host'])
def test_sync_allocations_entry_added_during_session(self):
with mock.patch.object(self.driver, '_add_allocation',
side_effect=db_exc.DBDuplicateEntry) as (
mock_add_allocation):
self.driver.sync_allocations()
self.assertTrue(mock_add_allocation.called)
def test__add_allocation_not_existing(self):
session = db_api.get_session()
_add_allocation(session, gre_id=1)
self.driver._add_allocation(session, {1, 2})
_get_allocation(session, 2)
def test__add_allocation_existing_allocated_is_kept(self):
session = db_api.get_session()
_add_allocation(session, gre_id=1, allocated=True)
self.driver._add_allocation(session, {2})
_get_allocation(session, 1)
def test__add_allocation_existing_not_allocated_is_removed(self):
session = db_api.get_session()
_add_allocation(session, gre_id=1)
self.driver._add_allocation(session, {2})
with testtools.ExpectedException(sa_exc.NoResultFound):
_get_allocation(session, 1)
def test_get_mtu(self):
config.cfg.CONF.set_override('segment_mtu', 1500, group='ml2')
config.cfg.CONF.set_override('path_mtu', 1475, group='ml2')