Use context manager from neutron-lib accessing VlanAllocation

Changed the code to use context manager for updating ml2_vlan_allocations db when
arista_vlan type driver is in use. This also makes the code compatible with
SQLAlchemy version 2.0 where Session.autocommit is removed

Change-Id: I75b760a27d7aef29591c0efc03542d37a30afd70
changes/65/867265/6
Nader Lahouti 2022-12-12 08:50:36 -08:00
parent e04620b729
commit 0fff7b8ae3
4 changed files with 203 additions and 70 deletions

View File

@ -157,7 +157,7 @@ class AristaRPCWrapperEapi(AristaRPCWrapperBase):
VLANs for the region
"""
if not self.cli_commands['resource-pool']:
LOG.warning(_('The version of CVX you are using does not support'
LOG.warning(_('The version of CVX you are using does not support '
'arista VLAN type driver.'))
else:
cmd = ['show openstack resource-pools region %s' % self.region]

View File

@ -13,11 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron_lib import constants as n_const
from neutron_lib import context
from neutron_lib.db import api as db_api
from oslo_log import log
from six import moves
from neutron.db.models.plugins.ml2 import vlanallocation
from neutron.objects import network as network_obj
from neutron.objects.plugins.ml2 import vlanallocation as vlanalloc
from networking_arista._i18n import _LI
from networking_arista.common.constants import EOS_UNREACHABLE_MSG
@ -104,44 +109,67 @@ class VlanSyncService(object):
self._force_sync = True
return
LOG.debug('vlan_pool %(vlan)s', {'vlan': vlan_pool})
self._assigned_vlans = {
'default': self._parse_vlan_ranges(vlan_pool['assignedVlans'],
return_as_ranges=True),
'default': self._parse_vlan_ranges(vlan_pool['assignedVlans']),
}
assigned_vlans = (
self._parse_vlan_ranges(vlan_pool['assignedVlans']))
available_vlans = frozenset(
cvx_available_vlans = frozenset(
self._parse_vlan_ranges(vlan_pool['availableVlans']))
used_vlans = frozenset(
cvx_used_vlans = frozenset(
self._parse_vlan_ranges(vlan_pool['allocatedVlans']))
# Force vlan sync if assignedVlans is empty or availableVlans and
# allocatedVlans both are empty in the vlan_pool
if not(self._assigned_vlans['default'] and
(cvx_available_vlans or cvx_used_vlans)):
LOG.info(_LI('Force sync, vlan pool is empty'))
self.force_sync()
else:
self._force_sync = False
self._force_sync = False
allocated_vlans = {}
ctx = context.get_admin_context()
with db_api.CONTEXT_READER.using(ctx):
for physical_network in self._assigned_vlans:
filter = {
'network_type': n_const.TYPE_VLAN,
'physical_network': physical_network,
}
objs = network_obj.NetworkSegment.get_objects(ctx, **filter)
allocated_vlans.update(
{physical_network: [obj.segmentation_id for obj in objs]})
LOG.debug('allocated vlans %(vlan)s', {'vlan': allocated_vlans})
session = db_api.get_writer_session()
with session.begin(subtransactions=True):
allocs = (
session.query(vlanallocation.VlanAllocation).with_for_update())
with db_api.CONTEXT_WRITER.using(ctx):
physnets = vlanalloc.VlanAllocation.get_physical_networks(ctx)
physnets_unconfigured = physnets - set(self._assigned_vlans)
if physnets_unconfigured:
vlanalloc.VlanAllocation.delete_physical_networks(
ctx, physnets_unconfigured)
for alloc in allocs:
if alloc.physical_network != 'default':
session.delete(alloc)
allocations = collections.defaultdict(list)
for alloc in vlanalloc.VlanAllocation.get_objects(ctx):
allocations[alloc.physical_network].append(alloc)
try:
assigned_vlans.remove(alloc.vlan_id)
except KeyError:
session.delete(alloc)
continue
if alloc.allocated and alloc.vlan_id in available_vlans:
alloc.update({"allocated": False})
elif not alloc.allocated and alloc.vlan_id in used_vlans:
alloc.update({"allocated": True})
for vlan_id in sorted(assigned_vlans):
allocated = vlan_id in used_vlans
alloc = vlanallocation.VlanAllocation(
physical_network='default',
vlan_id=vlan_id,
allocated=allocated)
session.add(alloc)
for physical_network, vlan_ranges in self._assigned_vlans.items():
if physical_network in allocations:
for alloc in allocations[physical_network]:
try:
vlan_ranges.remove(alloc.vlan_id)
except KeyError:
alloc.delete()
vlanalloc.VlanAllocation.bulk_create(ctx, physical_network,
vlan_ranges)
LOG.debug('vlan_ranges: %(vlan)s', {'vlan': vlan_ranges})
for vlan_id in vlan_ranges:
allocated = (vlan_id not in cvx_available_vlans and
(vlan_id in cvx_used_vlans or vlan_id in
allocated_vlans[physical_network]))
LOG.debug('Updating %(phys)s %(vlan)s %(alloc)s',
{'phys': physical_network, 'vlan': vlan_id,
'alloc': allocated})
vlanalloc.VlanAllocation.update_objects(ctx,
values={'allocated': allocated,
'vlan_id': vlan_id,
'physical_network': physical_network},
physical_network=physical_network,
vlan_id=vlan_id)

View File

@ -13,14 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import mock
from mock import patch
from neutron_lib.db import api as db_api
from neutron_lib import context
from oslo_config import cfg
from neutron.db.models.plugins.ml2 import vlanallocation
from neutron.tests.unit.plugins.ml2.drivers import test_helpers
from neutron.tests.unit import testlib_api
@ -84,50 +81,119 @@ class AristaTypeDriverHelpersTest(test_helpers.HelpersTest):
class VlanSyncServiceTest(testlib_api.SqlTestCase):
"""Test that VLANs are synchronized between EOS and Neutron."""
def _ensure_in_db(self, assigned, allocated, available):
session = db_api.get_reader_session()
with session.begin():
vlans = session.query(vlanallocation.VlanAllocation).all()
for vlan in vlans:
self.assertIn(vlan.vlan_id, assigned)
def setUp(self):
super(VlanSyncServiceTest, self).setUp()
self.rpc = mock.MagicMock()
self.sync_service = VlanSyncService(self.rpc)
self.ctx = context.get_admin_context()
if vlan.vlan_id in available:
self.assertFalse(vlan.allocated)
elif vlan.vlan_id in allocated:
self.assertTrue(vlan.allocated)
def tearDown(self):
super(VlanSyncServiceTest, self).tearDown()
# Cleanup the db
utils.delete_vlan_allocation(self.ctx)
def _ensure_in_db(self, assigned, allocated, available):
vlans = utils.get_vlan_allocation(self.ctx)
self.assertEqual(len(vlans), len(assigned))
used_vlans = []
available_vlans = []
for vlan in vlans:
self.assertIn(vlan.vlan_id, assigned)
if vlan.vlan_id in available:
self.assertFalse(vlan.allocated)
available_vlans.append(vlan.vlan_id)
elif vlan.vlan_id in allocated:
self.assertTrue(vlan.allocated)
used_vlans.append(vlan.vlan_id)
self.assertEqual(set(used_vlans), set(allocated))
self.assertEqual(set(available_vlans), set(available))
def _get_vlan_allocations(self):
vlan_allocations = {
'available_vlans': [],
'allocated_vlans': [],
}
vlans = utils.get_vlan_allocation(self.ctx)
for vlan in vlans:
if vlan.allocated:
vlan_allocations['allocated_vlans'].append(vlan.vlan_id)
else:
vlan_allocations['available_vlans'].append(vlan.vlan_id)
return vlan_allocations
def test_synchronization_before_region_sync(self):
"""Test VLAN sync with empty data from CVX"""
# Populated VlanAllocations before starting the sync
for seg_id in range(2, 500):
utils.create_vlan_allocation(self.ctx, segmentation_id=seg_id)
self.rpc.get_vlan_allocation.return_value = {
'assignedVlans': '10, 100',
'availableVlans': '',
'allocatedVlans': ''
}
self.sync_service.synchronize()
self.assertTrue(self.sync_service._force_sync)
# Verify only assignedVlans exist in the db
vlans = self._get_vlan_allocations()
assigned_vlans = [10, 100]
allocated_vlans = []
self.assertEqual(set(vlans['available_vlans']), set(assigned_vlans))
self.assertEqual(set(vlans['allocated_vlans']), set(allocated_vlans))
def test_synchronization_test(self):
rpc = mock.MagicMock()
"""Test VLAN sync based on allocated VLANs in db and CVX"""
rpc.get_vlan_allocation.return_value = {
'assignedVlans': '1-10,21-30',
'availableVlans': '1-5,21,23,25,27,29',
'allocatedVlans': '6-10,22,24,26,28,30'
# Add entries to vlan allocation table
VLAN_MIN = 100
VLAN_MAX = 300
for seg_id in range(VLAN_MIN, VLAN_MAX + 1):
allocated = seg_id in [VLAN_MIN, VLAN_MAX]
utils.create_vlan_allocation(self.ctx, segmentation_id=seg_id,
allocated=allocated)
# Test case that vlan resource pool does not have allocated vlans
self.rpc.get_vlan_allocation.return_value = {
'assignedVlans': '10-20, 50-60, %s, %s' % (VLAN_MIN, VLAN_MAX),
'availableVlans': '10-20, 50-60',
'allocatedVlans': ''
}
self.sync_service.synchronize()
assigned = list(itertools.chain(range(1, 11), range(21, 31)))
self.assertFalse(self.sync_service._force_sync)
available = [1, 2, 3, 4, 5, 21, 23, 25, 27, 29]
allocated = list(set(assigned) - set(available))
allocated_vlans = [VLAN_MIN, VLAN_MAX]
available_vlans = list(set(range(10, 21)) | set(range(50, 61)))
assigned_vlans = list(set(available_vlans) | set(allocated_vlans))
self._ensure_in_db(assigned_vlans, allocated_vlans, available_vlans)
sync_service = VlanSyncService(rpc)
sync_service.synchronize()
# Test case that vlan resource pool has updated resources
self.rpc.get_vlan_allocation.return_value = {
'assignedVlans': '200-220, %s, %s' % (VLAN_MIN, VLAN_MAX),
'availableVlans': '200-220',
'allocatedVlans': '%s, %s' % (VLAN_MIN, VLAN_MAX)
}
available_vlans = list(set(range(200, 221)))
assigned_vlans = list(set(available_vlans) | set(allocated_vlans))
self.sync_service.synchronize()
self._ensure_in_db(assigned_vlans, allocated_vlans, available_vlans)
self._ensure_in_db(assigned, allocated, available)
def test_synchronization_test_with_data_from_cvx(self):
"""Test VLAN sync based on data from CVX"""
# Call synchronize again which returns different data
rpc.get_vlan_allocation.return_value = {
self.rpc.get_vlan_allocation.return_value = {
'assignedVlans': '51-60,71-80',
'availableVlans': '51-55,71,73,75,77,79',
'allocatedVlans': '56-60,72,74,76,78,80'
}
assigned_vlans = list(set(range(51, 61)) | set(range(71, 81)))
available_vlans = [51, 52, 53, 54, 55, 71, 73, 75, 77, 79]
allocated_vlans = list(set(assigned_vlans) - set(available_vlans))
self.sync_service.synchronize()
assigned = list(itertools.chain(range(51, 61), range(71, 81)))
self.assertFalse(self.sync_service._force_sync)
available = [51, 52, 53, 54, 55, 71, 73, 75, 77, 79]
allocated = list(set(assigned) - set(available))
sync_service = VlanSyncService(rpc)
sync_service.synchronize()
self._ensure_in_db(assigned, allocated, available)
self._ensure_in_db(assigned_vlans, allocated_vlans, available_vlans)

View File

@ -27,6 +27,7 @@ from neutron.db.models import l3 as l3_models
from neutron.db.models import l3ha as l3ha_models
from neutron.db.models import segment as segment_models
from neutron.db import models_v2
from neutron.objects.plugins.ml2 import vlanallocation as vlan_alloc_obj
from neutron.plugins.ml2 import models as ml2_models
from neutron.services.trunk import models as t_models
@ -340,6 +341,7 @@ def create_networks(networks):
with session.begin():
for network in networks:
session.add(models_v2.Network(**network))
session.commit()
def delete_network(network_id):
@ -348,6 +350,7 @@ def delete_network(network_id):
network_model = models_v2.Network
session.query(network_model).filter(
network_model.id == network_id).delete()
session.commit()
def delete_networks_for_tenant(tenant_id):
@ -359,6 +362,7 @@ def delete_networks_for_tenant(tenant_id):
for network in networks:
delete_ports_on_network(network.id)
session.delete(network)
session.commit()
# Segment utils #
@ -369,6 +373,7 @@ def create_segments(segments):
with session.begin():
for segment in segments:
session.add(segment_models.NetworkSegment(**segment))
session.commit()
def delete_segment(segment_id):
@ -377,6 +382,7 @@ def delete_segment(segment_id):
segment_model = segment_models.NetworkSegment
session.query(segment_model).filter(
segment_model.id == segment_id).delete()
session.commit()
def delete_segments_for_network(network_id):
@ -385,6 +391,7 @@ def delete_segments_for_network(network_id):
segment_model = segment_models.NetworkSegment
session.query(segment_model).filter(
segment_model.network_id == network_id).delete()
session.commit()
def delete_segments_for_tenant(tenant_id):
@ -397,6 +404,7 @@ def delete_segments_for_tenant(tenant_id):
for network in networks:
session.query(segment_model).filter(
segment_model.network_id == network.id).delete()
session.commit()
# Port utils #
@ -424,6 +432,7 @@ def create_ports(ports):
for binding_level in binding_levels:
binding_level['port_id'] = port['id']
session.add(ml2_models.PortBindingLevel(**binding_level))
session.commit()
def delete_port(port_id):
@ -432,6 +441,7 @@ def delete_port(port_id):
port_model = models_v2.Port
session.query(port_model).filter(
port_model.id == port_id).delete()
session.commit()
def delete_ports_on_network(network_id):
@ -440,6 +450,7 @@ def delete_ports_on_network(network_id):
port_model = models_v2.Port
session.query(port_model).filter(
port_model.network_id == network_id).delete()
session.commit()
def delete_ports_for_instance(instance_id):
@ -448,6 +459,7 @@ def delete_ports_for_instance(instance_id):
port_model = models_v2.Port
session.query(port_model).filter(
port_model.device_id == instance_id).delete()
session.commit()
def delete_ports_for_tenant(tenant_id):
@ -456,6 +468,7 @@ def delete_ports_for_tenant(tenant_id):
port_model = models_v2.Port
session.query(port_model).filter(
port_model.project_id == tenant_id).delete()
session.commit()
# Port binding utils #
@ -484,6 +497,7 @@ def delete_port_binding(port_id, host):
dpb_model.host == host))
for binding in bindings:
session.delete(binding)
session.commit()
def remove_switch_binding(port_id, switch_id, intf_id):
@ -501,6 +515,7 @@ def remove_switch_binding(port_id, switch_id, intf_id):
binding.profile = json.dumps(profile)
if len(lli) == 0:
delete_port_binding(port_id, binding.host)
session.commit()
# Trunk utils #
@ -511,6 +526,7 @@ def create_trunks(trunks):
with session.begin():
for trunk in trunks:
session.add(t_models.Trunk(**trunk))
session.commit()
def create_subports(subports):
@ -518,6 +534,7 @@ def create_subports(subports):
with session.begin():
for subport in subports:
session.add(t_models.SubPort(**subport))
session.commit()
# L3 HA Router utils #
@ -527,6 +544,7 @@ def create_ha_routers(routers):
with session.begin():
for router in routers:
session.add(l3_models.Router(**router))
session.commit()
def create_ha_router_networks(networks):
@ -534,6 +552,7 @@ def create_ha_router_networks(networks):
with session.begin():
for network in networks:
session.add(l3ha_models.L3HARouterNetwork(**network))
session.commit()
def delete_ha_router_for_tenant(tenant_id):
@ -544,6 +563,7 @@ def delete_ha_router_for_tenant(tenant_id):
router_model.project_id == tenant_id).all()
for router in routers:
session.delete(router)
session.commit()
def setup_scenario():
@ -817,3 +837,22 @@ def setup_scenario():
create_ports(ports)
create_trunks(trunks)
create_subports(subports)
# VlanAllocation utils
def create_vlan_allocation(ctx, segmentation_id, physical_network='default',
allocated=False):
attr = {'physical_network': physical_network,
'allocated': allocated,
'vlan_id': segmentation_id}
alloc = vlan_alloc_obj.VlanAllocation(ctx, **attr)
alloc.create()
def get_vlan_allocation(ctx):
return vlan_alloc_obj.VlanAllocation.get_objects(ctx)
def delete_vlan_allocation(ctx):
vlan_alloc_obj.VlanAllocation.delete_objects(ctx)