Merge "Override 'create' for Trunk and SubPort"

This commit is contained in:
Jenkins 2016-06-22 17:56:48 +00:00 committed by Gerrit Code Review
commit 6771e70419
4 changed files with 157 additions and 13 deletions

View File

@ -13,10 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import exceptions as n_exc
from oslo_db import exception as o_db_exc
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron.db import api as db_api
from neutron.objects import base
from neutron.services.trunk import exceptions as t_exc
from neutron.services.trunk import models
@ -27,7 +31,8 @@ class SubPort(base.NeutronDbObject):
db_model = models.SubPort
primary_keys = ['port_id', 'trunk_id']
primary_keys = ['port_id']
foreign_keys = {'trunk_id': 'id'}
fields = {
'port_id': obj_fields.UUIDField(),
@ -38,6 +43,30 @@ class SubPort(base.NeutronDbObject):
fields_no_update = ['segmentation_type', 'segmentation_id']
def create(self):
with db_api.autonested_transaction(self.obj_context.session):
try:
super(SubPort, self).create()
except o_db_exc.DBReferenceError as ex:
if ex.key_table is None:
# NOTE(ivc): 'key_table' is provided by 'oslo.db' [1]
# only for a limited set of database backends (i.e.
# MySQL and PostgreSQL). Other database backends
# (including SQLite) would have 'key_table' set to None.
# We emulate the 'key_table' support for such database
# backends.
#
# [1] https://github.com/openstack/oslo.db/blob/3fadd5a
# /oslo_db/sqlalchemy/exc_filters.py#L190-L203
if not Trunk.get_object(self.obj_context,
id=self.trunk_id):
ex.key_table = Trunk.db_model.__tablename__
if ex.key_table == Trunk.db_model.__tablename__:
raise t_exc.TrunkNotFound(trunk_id=self.trunk_id)
raise n_exc.PortNotFound(port_id=self.port_id)
@obj_base.VersionedObjectRegistry.register
class Trunk(base.NeutronDbObject):
@ -56,3 +85,22 @@ class Trunk(base.NeutronDbObject):
fields_no_update = ['tenant_id', 'port_id']
synthetic_fields = ['sub_ports']
def create(self):
with db_api.autonested_transaction(self.obj_context.session):
sub_ports = []
if self.obj_attr_is_set('sub_ports'):
sub_ports = self.sub_ports
try:
super(Trunk, self).create()
except o_db_exc.DBReferenceError:
raise n_exc.PortNotFound(port_id=self.port_id)
for sub_port in sub_ports:
sub_port.trunk_id = self.id
sub_port.create()
self.load_synthetic_db_fields()
# TODO(ivc): add support for 'sub_ports' in 'Trunk.update' for
# consistency with 'Trunk.create'

View File

@ -20,3 +20,7 @@ from neutron_lib import exceptions as n_exc
class TrunkPortInUse(n_exc.InUse):
message = _("Port %(port_id)s is in use by another trunk.")
class TrunkNotFound(n_exc.NotFound):
message = _("Trunk %(trunk_id)s could not be found.")

View File

@ -12,6 +12,7 @@
import collections
import copy
import itertools
import random
import mock
@ -710,18 +711,33 @@ class BaseDbObjectTestCase(_BaseObjectTestCase):
self._subnet = subnet.Subnet(self.context, **test_subnet)
self._subnet.create()
def _create_test_port(self, network):
def _create_port(self, **port_attrs):
if not hasattr(self, '_mac_address_generator'):
self._mac_address_generator = (":".join(["%02x" % i] * 6)
for i in itertools.count())
if not hasattr(self, '_port_name_generator'):
self._port_name_generator = ("test-port%d" % i
for i in itertools.count(1))
attrs = {'tenant_id': 'fake_tenant_id',
'admin_state_up': True,
'status': 'ACTIVE',
'device_id': 'fake_device',
'device_owner': 'fake_owner'}
attrs.update(port_attrs)
if 'name' not in attrs:
attrs['name'] = next(self._port_name_generator)
if 'mac_address' not in attrs:
attrs['mac_address'] = next(self._mac_address_generator)
# TODO(ihrachys): replace with port.create() once we get an object
# implementation for ports
self._port = obj_db_api.create_object(self.context, models_v2.Port,
{'tenant_id': 'fake_tenant_id',
'name': 'test-port1',
'network_id': network['id'],
'mac_address': 'fake_mac',
'admin_state_up': True,
'status': 'ACTIVE',
'device_id': 'fake_device',
'device_owner': 'fake_owner'})
return obj_db_api.create_object(self.context, models_v2.Port, attrs)
def _create_test_port(self, network):
self._port = self._create_port(network_id=network['id'])
def _make_object(self, fields):
return self._test_class(

View File

@ -13,8 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import exceptions as n_exc
from oslo_utils import uuidutils
from neutron.objects import trunk as t_obj
from neutron.services.trunk import exceptions as t_exc
from neutron.tests.unit.objects import test_base
from neutron.tests.unit import testlib_api
class SubPortObjectTestCase(test_base.BaseObjectIfaceTestCase):
@ -22,16 +27,87 @@ class SubPortObjectTestCase(test_base.BaseObjectIfaceTestCase):
_test_class = t_obj.SubPort
class SubPortDbObjectTestCase(test_base.BaseDbObjectTestCase):
class SubPortDbObjectTestCase(test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = t_obj.SubPort
def setUp(self):
super(SubPortDbObjectTestCase, self).setUp()
self._create_test_network()
for obj in self.obj_fields:
self._create_port(id=obj['port_id'],
network_id=self._network['id'])
self._create_trunk(trunk_id=obj['trunk_id'])
def _create_trunk(self, trunk_id):
port_id = uuidutils.generate_uuid()
self._create_port(id=port_id, network_id=self._network['id'])
trunk = t_obj.Trunk(self.context, id=trunk_id, port_id=port_id)
trunk.create()
def test_create_port_not_found(self):
obj = self.obj_fields[0]
obj['port_id'] = uuidutils.generate_uuid()
sub_port = self._make_object(obj)
self.assertRaises(n_exc.PortNotFound, sub_port.create)
def test_create_trunk_not_found(self):
obj = self.obj_fields[0]
obj['trunk_id'] = uuidutils.generate_uuid()
sub_port = self._make_object(obj)
self.assertRaises(t_exc.TrunkNotFound, sub_port.create)
class TrunkObjectTestCase(test_base.BaseObjectIfaceTestCase):
_test_class = t_obj.Trunk
class TrunkDbObjectTestCase(test_base.BaseDbObjectTestCase):
class TrunkDbObjectTestCase(test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = t_obj.Trunk
def setUp(self):
super(TrunkDbObjectTestCase, self).setUp()
self._create_test_network()
for obj in self.obj_fields:
self._create_port(id=obj['port_id'],
network_id=self._network['id'])
def test_create_port_not_found(self):
obj = self.obj_fields[0]
obj['port_id'] = uuidutils.generate_uuid()
trunk = self._make_object(obj)
self.assertRaises(n_exc.PortNotFound, trunk.create)
def test_create_with_sub_ports(self):
tenant_id = uuidutils.generate_uuid()
def _as_tuple(sub_port):
return (sub_port['port_id'],
sub_port['segmentation_type'],
sub_port['segmentation_id'])
sub_ports = []
for vid in range(1, 3):
port = self._create_port(network_id=self._network['id'])
sub_ports.append(t_obj.SubPort(self.context, port_id=port['id'],
segmentation_type='vlan',
segmentation_id=vid))
expected = set(map(_as_tuple, sub_ports))
trunk = t_obj.Trunk(self.context, port_id=self.db_obj['port_id'],
sub_ports=sub_ports, tenant_id=tenant_id)
trunk.create()
sub_ports = t_obj.SubPort.get_objects(self.context, trunk_id=trunk.id)
self.assertEqual(expected, set(map(_as_tuple, trunk.sub_ports)))
self.assertEqual(expected, set(map(_as_tuple, sub_ports)))