Merge "Abstract sync_allocations"

This commit is contained in:
Jenkins 2015-07-15 00:24:24 +00:00 committed by Gerrit Code Review
commit 9d84a424f2
5 changed files with 77 additions and 120 deletions

View File

@ -14,16 +14,13 @@
# under the License. # under the License.
from oslo_config import cfg from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log from oslo_log import log
from six import moves
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import sql from sqlalchemy import sql
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import model_base from neutron.db import model_base
from neutron.i18n import _LE, _LW from neutron.i18n import _LE
from neutron.plugins.common import constants as p_const from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_tunnel from neutron.plugins.ml2.drivers import type_tunnel
@ -83,44 +80,6 @@ class GreTypeDriver(type_tunnel.EndpointTunnelTypeDriver):
"Service terminated!")) "Service terminated!"))
raise SystemExit() raise SystemExit()
def sync_allocations(self):
# determine current configured allocatable gres
gre_ids = set()
for gre_id_range in self.tunnel_ranges:
tun_min, tun_max = gre_id_range
gre_ids |= set(moves.range(tun_min, tun_max + 1))
session = db_api.get_session()
try:
self._add_allocation(session, gre_ids)
except db_exc.DBDuplicateEntry:
# in case multiple neutron-servers start allocations could be
# already added by different neutron-server. because this function
# is called only when initializing this type driver, it's safe to
# assume allocations were added.
LOG.warning(_LW("Gre allocations were already created."))
def _add_allocation(self, session, gre_ids):
with session.begin(subtransactions=True):
# remove from table unallocated tunnels not currently allocatable
allocs = (session.query(GreAllocation).all())
for alloc in allocs:
try:
# see if tunnel is allocatable
gre_ids.remove(alloc.gre_id)
except KeyError:
# it's not allocatable, so check if its allocated
if not alloc.allocated:
# it's not, so remove it from table
LOG.debug("Removing tunnel %s from pool", alloc.gre_id)
session.delete(alloc)
# add missing allocatable tunnels to table
for gre_id in sorted(gre_ids):
alloc = GreAllocation(gre_id=gre_id)
session.add(alloc)
def get_endpoints(self): def get_endpoints(self):
"""Get every gre endpoints from database.""" """Get every gre endpoints from database."""
gre_endpoints = self._get_endpoints() gre_endpoints = self._get_endpoints()

View File

@ -13,10 +13,13 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import abc import abc
import itertools
import operator
from oslo_config import cfg from oslo_config import cfg
from oslo_db import exception as db_exc from oslo_db import exception as db_exc
from oslo_log import log from oslo_log import log
from six import moves
from neutron.common import exceptions as exc from neutron.common import exceptions as exc
from neutron.common import topics from neutron.common import topics
@ -31,21 +34,27 @@ LOG = log.getLogger(__name__)
TUNNEL = 'tunnel' TUNNEL = 'tunnel'
def chunks(iterable, chunk_size):
"""Chunks data into chunk with size<=chunk_size."""
iterator = iter(iterable)
chunk = list(itertools.islice(iterator, 0, chunk_size))
while chunk:
yield chunk
chunk = list(itertools.islice(iterator, 0, chunk_size))
class TunnelTypeDriver(helpers.SegmentTypeDriver): class TunnelTypeDriver(helpers.SegmentTypeDriver):
"""Define stable abstract interface for ML2 type drivers. """Define stable abstract interface for ML2 type drivers.
tunnel type networks rely on tunnel endpoints. This class defines abstract tunnel type networks rely on tunnel endpoints. This class defines abstract
methods to manage these endpoints. methods to manage these endpoints.
""" """
BULK_SIZE = 100
def __init__(self, model): def __init__(self, model):
super(TunnelTypeDriver, self).__init__(model) super(TunnelTypeDriver, self).__init__(model)
self.segmentation_key = next(iter(self.primary_keys)) self.segmentation_key = next(iter(self.primary_keys))
@abc.abstractmethod
def sync_allocations(self):
"""Synchronize type_driver allocation table with configured ranges."""
@abc.abstractmethod @abc.abstractmethod
def add_endpoint(self, ip, host): def add_endpoint(self, ip, host):
"""Register the endpoint in the type_driver database. """Register the endpoint in the type_driver database.
@ -113,6 +122,40 @@ class TunnelTypeDriver(helpers.SegmentTypeDriver):
LOG.info(_LI("%(type)s ID ranges: %(range)s"), LOG.info(_LI("%(type)s ID ranges: %(range)s"),
{'type': self.get_type(), 'range': current_range}) {'type': self.get_type(), 'range': current_range})
def sync_allocations(self):
# determine current configured allocatable tunnel ids
tunnel_ids = set()
for tun_min, tun_max in self.tunnel_ranges:
tunnel_ids |= set(moves.range(tun_min, tun_max + 1))
tunnel_id_getter = operator.attrgetter(self.segmentation_key)
tunnel_col = getattr(self.model, self.segmentation_key)
session = db_api.get_session()
with session.begin(subtransactions=True):
# remove from table unallocated tunnels not currently allocatable
# fetch results as list via all() because we'll be iterating
# through them twice
allocs = (session.query(self.model).
with_lockmode("update").all())
# collect those vnis that needs to be deleted from db
unallocateds = (
tunnel_id_getter(a) for a in allocs if not a.allocated)
to_remove = (x for x in unallocateds if x not in tunnel_ids)
# Immediately delete tunnels in chunks. This leaves no work for
# flush at the end of transaction
for chunk in chunks(to_remove, self.BULK_SIZE):
session.query(self.model).filter(
tunnel_col.in_(chunk)).delete(synchronize_session=False)
# collect vnis that need to be added
existings = {tunnel_id_getter(a) for a in allocs}
missings = list(tunnel_ids - existings)
for chunk in chunks(missings, self.BULK_SIZE):
bulk = [{self.segmentation_key: x, 'allocated': False}
for x in chunk]
session.execute(self.model.__table__.insert(), bulk)
def is_partial_segment(self, segment): def is_partial_segment(self, segment):
return segment.get(api.SEGMENTATION_ID) is None return segment.get(api.SEGMENTATION_ID) is None

View File

@ -15,12 +15,10 @@
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from six import moves
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import sql from sqlalchemy import sql
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
from neutron.db import api as db_api
from neutron.db import model_base from neutron.db import model_base
from neutron.i18n import _LE from neutron.i18n import _LE
from neutron.plugins.common import constants as p_const from neutron.plugins.common import constants as p_const
@ -86,45 +84,6 @@ class VxlanTypeDriver(type_tunnel.EndpointTunnelTypeDriver):
"Service terminated!")) "Service terminated!"))
raise SystemExit() raise SystemExit()
def sync_allocations(self):
# determine current configured allocatable vnis
vxlan_vnis = set()
for tun_min, tun_max in self.tunnel_ranges:
vxlan_vnis |= set(moves.range(tun_min, tun_max + 1))
session = db_api.get_session()
with session.begin(subtransactions=True):
# remove from table unallocated tunnels not currently allocatable
# fetch results as list via all() because we'll be iterating
# through them twice
allocs = (session.query(VxlanAllocation).
with_lockmode("update").all())
# collect all vnis present in db
existing_vnis = set(alloc.vxlan_vni for alloc in allocs)
# collect those vnis that needs to be deleted from db
vnis_to_remove = [alloc.vxlan_vni for alloc in allocs
if (alloc.vxlan_vni not in vxlan_vnis and
not alloc.allocated)]
# Immediately delete vnis in chunks. This leaves no work for
# flush at the end of transaction
bulk_size = 100
chunked_vnis = (vnis_to_remove[i:i + bulk_size] for i in
range(0, len(vnis_to_remove), bulk_size))
for vni_list in chunked_vnis:
if vni_list:
session.query(VxlanAllocation).filter(
VxlanAllocation.vxlan_vni.in_(vni_list)).delete(
synchronize_session=False)
# collect vnis that need to be added
vnis = list(vxlan_vnis - existing_vnis)
chunked_vnis = (vnis[i:i + bulk_size] for i in
range(0, len(vnis), bulk_size))
for vni_list in chunked_vnis:
bulk = [{'vxlan_vni': vni, 'allocated': False}
for vni in vni_list]
session.execute(VxlanAllocation.__table__.insert(), bulk)
def get_endpoints(self): def get_endpoints(self):
"""Get every vxlan endpoints from database.""" """Get every vxlan endpoints from database."""
vxlan_endpoints = self._get_endpoints() vxlan_endpoints = self._get_endpoints()

View File

@ -93,6 +93,35 @@ class TunnelTypeTestMixin(object):
self.assertIsNone( self.assertIsNone(
self.driver.get_allocation(self.session, (TUN_MAX + 5 + 1))) self.driver.get_allocation(self.session, (TUN_MAX + 5 + 1)))
def _test_sync_allocations_and_allocated(self, tunnel_id):
segment = {api.NETWORK_TYPE: self.TYPE,
api.PHYSICAL_NETWORK: None,
api.SEGMENTATION_ID: tunnel_id}
self.driver.reserve_provider_segment(self.session, segment)
self.driver.tunnel_ranges = UPDATED_TUNNEL_RANGES
self.driver.sync_allocations()
self.assertTrue(
self.driver.get_allocation(self.session, tunnel_id).allocated)
def test_sync_allocations_and_allocated_in_initial_range(self):
self._test_sync_allocations_and_allocated(TUN_MIN + 2)
def test_sync_allocations_and_allocated_in_final_range(self):
self._test_sync_allocations_and_allocated(TUN_MAX + 2)
def test_sync_allocations_no_op(self):
def verify_no_chunk(iterable, chunk_size):
# no segment removed/added
self.assertEqual(0, len(list(iterable)))
return []
with mock.patch.object(
type_tunnel, 'chunks', side_effect=verify_no_chunk) as chunks:
self.driver.sync_allocations()
self.assertEqual(2, len(chunks.mock_calls))
def test_partial_segment_is_partial_segment(self): def test_partial_segment_is_partial_segment(self):
segment = {api.NETWORK_TYPE: self.TYPE, segment = {api.NETWORK_TYPE: self.TYPE,
api.PHYSICAL_NETWORK: None, api.PHYSICAL_NETWORK: None,

View File

@ -13,13 +13,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import mock
from oslo_db import exception as db_exc
from sqlalchemy.orm import exc as sa_exc
import testtools
from neutron.db import api as db_api
from neutron.plugins.common import constants as p_const from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import config from neutron.plugins.ml2 import config
from neutron.plugins.ml2.drivers import type_gre from neutron.plugins.ml2.drivers import type_gre
@ -62,32 +55,6 @@ class GreTypeTest(base_type_tunnel.TunnelTypeTestMixin,
elif endpoint['ip_address'] == base_type_tunnel.TUNNEL_IP_TWO: elif endpoint['ip_address'] == base_type_tunnel.TUNNEL_IP_TWO:
self.assertEqual(base_type_tunnel.HOST_TWO, endpoint['host']) self.assertEqual(base_type_tunnel.HOST_TWO, endpoint['host'])
def test_sync_allocations_entry_added_during_session(self):
with mock.patch.object(self.driver, '_add_allocation',
side_effect=db_exc.DBDuplicateEntry) as (
mock_add_allocation):
self.driver.sync_allocations()
self.assertTrue(mock_add_allocation.called)
def test__add_allocation_not_existing(self):
session = db_api.get_session()
_add_allocation(session, gre_id=1)
self.driver._add_allocation(session, {1, 2})
_get_allocation(session, 2)
def test__add_allocation_existing_allocated_is_kept(self):
session = db_api.get_session()
_add_allocation(session, gre_id=1, allocated=True)
self.driver._add_allocation(session, {2})
_get_allocation(session, 1)
def test__add_allocation_existing_not_allocated_is_removed(self):
session = db_api.get_session()
_add_allocation(session, gre_id=1)
self.driver._add_allocation(session, {2})
with testtools.ExpectedException(sa_exc.NoResultFound):
_get_allocation(session, 1)
def test_get_mtu(self): def test_get_mtu(self):
config.cfg.CONF.set_override('segment_mtu', 1500, group='ml2') config.cfg.CONF.set_override('segment_mtu', 1500, group='ml2')
config.cfg.CONF.set_override('path_mtu', 1475, group='ml2') config.cfg.CONF.set_override('path_mtu', 1475, group='ml2')