Network RBAC DB setup and legacy migration

This patch implements the database model required for the network
RBAC work. In addition it migrates the current network and subnet
'shared' attributes to leverage the new table.

'shared' is no longer a property of the DB model because its status
is based on the tenant ID of the API caller. From an API perspective
this is the same (tenants will see networks as 'shared=True' if the
network is shared with them). However, internal callers (e.g. plugins,
drivers, services) will not be able to check for the 'shared' attribute
on network and subnet db objects any more.

This patch just achieves parity with the current shared behavior so it
doesn't add the ability to manipulate the RBAC entries directly. The
RBAC API is in the following patch.

Partially-Implements: blueprint rbac-networks
Change-Id: I3426b13eede8bfa29729cf3efea3419fb91175c4
This commit is contained in:
Kevin Benton 2015-06-15 02:18:36 -07:00
parent eb9226214f
commit 3e0328b992
13 changed files with 367 additions and 39 deletions

View File

@ -16,6 +16,8 @@
import weakref
import six
from sqlalchemy import and_
from sqlalchemy import or_
from sqlalchemy import sql
from neutron.common import exceptions as n_exc
@ -98,7 +100,15 @@ class CommonDbMixin(object):
# define basic filter condition for model query
query_filter = None
if self.model_query_scope(context, model):
if hasattr(model, 'shared'):
if hasattr(model, 'rbac_entries'):
rbac_model, join_params = self._get_rbac_query_params(model)
query = query.outerjoin(*join_params)
query_filter = (
(model.tenant_id == context.tenant_id) |
((rbac_model.action == 'access_as_shared') &
((rbac_model.target_tenant == context.tenant_id) |
(rbac_model.target_tenant == '*'))))
elif hasattr(model, 'shared'):
query_filter = ((model.tenant_id == context.tenant_id) |
(model.shared == sql.true()))
else:
@ -145,15 +155,47 @@ class CommonDbMixin(object):
query = self._model_query(context, model)
return query.filter(model.id == id).one()
def _apply_filters_to_query(self, query, model, filters):
@staticmethod
def _get_rbac_query_params(model):
"""Return the class and join params for the rbac relationship."""
try:
cls = model.rbac_entries.property.mapper.class_
return (cls, (cls, ))
except AttributeError:
# an association proxy is being used (e.g. subnets
# depends on network's rbac entries)
rbac_model = (model.rbac_entries.target_class.
rbac_entries.property.mapper.class_)
return (rbac_model, model.rbac_entries.attr)
def _apply_filters_to_query(self, query, model, filters, context=None):
if filters:
for key, value in six.iteritems(filters):
column = getattr(model, key, None)
if column:
# NOTE(kevinbenton): if column is a hybrid property that
# references another expression, attempting to convert to
# a boolean will fail so we must compare to None.
# See "An Important Expression Language Gotcha" in:
# docs.sqlalchemy.org/en/rel_0_9/changelog/migration_06.html
if column is not None:
if not value:
query = query.filter(sql.false())
return query
query = query.filter(column.in_(value))
elif key == 'shared' and hasattr(model, 'rbac_entries'):
# translate a filter on shared into a query against the
# object's rbac entries
rbac, join_params = self._get_rbac_query_params(model)
query = query.outerjoin(*join_params, aliased=True)
matches = [rbac.target_tenant == '*']
if context:
matches.append(rbac.target_tenant == context.tenant_id)
is_shared = and_(
~rbac.object_id.is_(None),
rbac.action == 'access_as_shared',
or_(*matches)
)
query = query.filter(is_shared if value[0] else ~is_shared)
for _nam, hooks in six.iteritems(self._model_query_hooks.get(model,
{})):
result_filter = hooks.get('result_filters', None)
@ -181,7 +223,8 @@ class CommonDbMixin(object):
sorts=None, limit=None, marker_obj=None,
page_reverse=False):
collection = self._model_query(context, model)
collection = self._apply_filters_to_query(collection, model, filters)
collection = self._apply_filters_to_query(collection, model, filters,
context)
if limit and page_reverse and sorts:
sorts = [(s[0], not s[1]) for s in sorts]
collection = sqlalchemyutils.paginate_query(collection, model, limit,

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
from oslo_config import cfg
from oslo_log import log as logging
from sqlalchemy.orm import exc
@ -72,7 +74,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
)
context.session.add(allocated)
def _make_subnet_dict(self, subnet, fields=None):
def _make_subnet_dict(self, subnet, fields=None, context=None):
res = {'id': subnet['id'],
'name': subnet['name'],
'tenant_id': subnet['tenant_id'],
@ -92,8 +94,10 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
'host_routes': [{'destination': route['destination'],
'nexthop': route['nexthop']}
for route in subnet['routes']],
'shared': subnet['shared']
}
# The shared attribute for a subnet is the same as its parent network
res['shared'] = self._make_network_dict(subnet.networks,
context=context)['shared']
# Call auxiliary extend functions, if any
self._apply_dict_extend_functions(attributes.SUBNETS, res, subnet)
return self._fields(res, fields)
@ -196,8 +200,10 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'subnet', limit, marker)
make_subnet_dict = functools.partial(self._make_subnet_dict,
context=context)
return self._get_collection(context, models_v2.Subnet,
self._make_subnet_dict,
make_subnet_dict,
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
@ -205,16 +211,24 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
page_reverse=page_reverse)
def _make_network_dict(self, network, fields=None,
process_extensions=True):
process_extensions=True, context=None):
res = {'id': network['id'],
'name': network['name'],
'tenant_id': network['tenant_id'],
'admin_state_up': network['admin_state_up'],
'mtu': network.get('mtu', constants.DEFAULT_NETWORK_MTU),
'status': network['status'],
'shared': network['shared'],
'subnets': [subnet['id']
for subnet in network['subnets']]}
# The shared attribute for a network now reflects if the network
# is shared to the calling tenant via an RBAC entry.
shared = False
for entry in network.rbac_entries:
if (entry.action == 'access_as_shared' and
entry.target_tenant in ('*', context.tenant_id)):
shared = True
break
res['shared'] = shared
# TODO(pritesh): Move vlan_transparent to the extension module.
# vlan_transparent here is only added if the vlantransparent
# extension is enabled.
@ -227,8 +241,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
attributes.NETWORKS, res, network)
return self._fields(res, fields)
def _make_subnet_args(self, shared, detail,
subnet, subnetpool_id):
def _make_subnet_args(self, detail, subnet, subnetpool_id):
gateway_ip = str(detail.gateway_ip) if detail.gateway_ip else None
args = {'tenant_id': detail.tenant_id,
'id': detail.subnet_id,
@ -238,8 +251,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
'cidr': str(detail.subnet_cidr),
'subnetpool_id': subnetpool_id,
'enable_dhcp': subnet['enable_dhcp'],
'gateway_ip': gateway_ip,
'shared': shared}
'gateway_ip': gateway_ip}
if subnet['ip_version'] == 6 and subnet['enable_dhcp']:
if attributes.is_attr_set(subnet['ipv6_ra_mode']):
args['ipv6_ra_mode'] = subnet['ipv6_ra_mode']

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import netaddr
from oslo_config import cfg
from oslo_db import exception as db_exc
@ -35,6 +37,7 @@ from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.db import ipam_non_pluggable_backend
from neutron.db import models_v2
from neutron.db import rbac_db_models as rbac_db
from neutron.db import sqlalchemyutils
from neutron.extensions import l3
from neutron.i18n import _LE, _LI
@ -235,7 +238,6 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
'name': n['name'],
'admin_state_up': n['admin_state_up'],
'mtu': n.get('mtu', constants.DEFAULT_NETWORK_MTU),
'shared': n['shared'],
'status': n.get('status', constants.NET_STATUS_ACTIVE)}
# TODO(pritesh): Move vlan_transparent to the extension module.
# vlan_transparent here is only added if the vlantransparent
@ -244,8 +246,14 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
attributes.ATTR_NOT_SPECIFIED):
args['vlan_transparent'] = n['vlan_transparent']
network = models_v2.Network(**args)
if n['shared']:
entry = rbac_db.NetworkRBAC(
network=network, action='access_as_shared',
target_tenant='*', tenant_id=network['tenant_id'])
context.session.add(entry)
context.session.add(network)
return self._make_network_dict(network, process_extensions=False)
return self._make_network_dict(network, process_extensions=False,
context=context)
def update_network(self, context, id, network):
n = network['network']
@ -253,13 +261,25 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
network = self._get_network(context, id)
# validate 'shared' parameter
if 'shared' in n:
entry = None
for item in network.rbac_entries:
if (item.action == 'access_as_shared' and
item.target_tenant == '*'):
entry = item
break
setattr(network, 'shared', True if entry else False)
self._validate_shared_update(context, id, network, n)
update_shared = n.pop('shared')
if update_shared and not entry:
entry = rbac_db.NetworkRBAC(
network=network, action='access_as_shared',
target_tenant='*', tenant_id=network['tenant_id'])
context.session.add(entry)
elif not update_shared and entry:
context.session.delete(entry)
context.session.expire(network, ['rbac_entries'])
network.update(n)
# also update shared in all the subnets for this network
subnets = self._get_subnets_by_network(context, id)
for subnet in subnets:
subnet['shared'] = network['shared']
return self._make_network_dict(network)
return self._make_network_dict(network, context=context)
def delete_network(self, context, id):
with context.session.begin(subtransactions=True):
@ -285,14 +305,16 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
def get_network(self, context, id, fields=None):
network = self._get_network(context, id)
return self._make_network_dict(network, fields)
return self._make_network_dict(network, fields, context=context)
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'network', limit, marker)
make_network_dict = functools.partial(self._make_network_dict,
context=context)
return self._get_collection(context, models_v2.Network,
self._make_network_dict,
make_network_dict,
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
@ -460,7 +482,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# internal ports on the network with addresses for this subnet.
if ipv6_utils.is_auto_address_subnet(subnet):
self.ipam.add_auto_addrs_on_network_ports(context, subnet)
return self._make_subnet_dict(subnet)
return self._make_subnet_dict(subnet, context=context)
def _get_subnetpool_id(self, subnet):
"""Returns the subnetpool id for this request
@ -554,7 +576,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
with context.session.begin(subtransactions=True):
subnet, changes = self.ipam.update_db_subnet(context, id, s)
result = self._make_subnet_dict(subnet)
result = self._make_subnet_dict(subnet, context=context)
# Keep up with fields that changed
result.update(changes)
return result
@ -634,7 +656,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
def get_subnet(self, context, id, fields=None):
subnet = self._get_subnet(context, id)
return self._make_subnet_dict(subnet, fields)
return self._make_subnet_dict(subnet, fields, context=context)
def get_subnets(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
@ -914,7 +936,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if subnet_ids:
query = query.filter(IPAllocation.subnet_id.in_(subnet_ids))
query = self._apply_filters_to_query(query, Port, filters)
query = self._apply_filters_to_query(query, Port, filters, context)
if limit and page_reverse and sorts:
sorts = [(s[0], not s[1]) for s in sorts]
query = sqlalchemyutils.paginate_query(query, Port, limit,

View File

@ -499,7 +499,6 @@ class IpamNonPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
subnet = self._save_subnet(context,
network,
self._make_subnet_args(
network.shared,
subnet_request,
subnet,
subnetpool_id),

View File

@ -1,3 +1,3 @@
30018084ec99
313373c0ffee
4ffceebfada
8675309a5c4f
kilo

View File

@ -0,0 +1,69 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""network_rbac
Revision ID: 4ffceebfada
Revises: 30018084ec99
Create Date: 2015-06-14 13:12:04.012457
"""
# revision identifiers, used by Alembic.
revision = '4ffceebfada'
down_revision = '30018084ec99'
depends_on = ('8675309a5c4f',)
from alembic import op
from oslo_utils import uuidutils
import sqlalchemy as sa
# A simple model of the networks table with only the fields needed for
# the migration.
network = sa.Table('networks', sa.MetaData(),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=255)),
sa.Column('shared', sa.Boolean(), nullable=False))
networkrbacs = sa.Table(
'networkrbacs', sa.MetaData(),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('object_id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=255), nullable=True,
index=True),
sa.Column('target_tenant', sa.String(length=255), nullable=False),
sa.Column('action', sa.String(length=255), nullable=False))
def upgrade():
op.bulk_insert(networkrbacs, get_values())
op.drop_column('networks', 'shared')
# the shared column on subnets was just an internal representation of the
# shared status of the network it was related to. This is now handled by
# other logic so we just drop it.
op.drop_column('subnets', 'shared')
def get_values():
session = sa.orm.Session(bind=op.get_bind())
values = []
for row in session.query(network).filter(network.c.shared).all():
values.append({'id': uuidutils.generate_uuid(), 'object_id': row[0],
'tenant_id': row[1], 'target_tenant': '*',
'action': 'access_as_shared'})
# this commit appears to be necessary to allow further operations
session.commit()
return values

View File

@ -0,0 +1,47 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""network_rbac
Revision ID: 8675309a5c4f
Revises: 313373c0ffee
Create Date: 2015-06-14 13:12:04.012457
"""
# revision identifiers, used by Alembic.
revision = '8675309a5c4f'
down_revision = '313373c0ffee'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'networkrbacs',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('object_id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=255), nullable=True,
index=True),
sa.Column('target_tenant', sa.String(length=255), nullable=False),
sa.Column('action', sa.String(length=255), nullable=False),
sa.ForeignKeyConstraint(['object_id'],
['networks.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint(
'action', 'object_id', 'target_tenant',
name='uniq_networkrbacs0tenant_target0object_id0action'))

View File

@ -41,6 +41,7 @@ from neutron.db import models_v2 # noqa
from neutron.db import portbindings_db # noqa
from neutron.db import portsecurity_db # noqa
from neutron.db import quota_db # noqa
from neutron.db import rbac_db_models # noqa
from neutron.db import securitygroups_db # noqa
from neutron.db import servicetype_db # noqa
from neutron.ipam.drivers.neutrondb_ipam import db_models # noqa

View File

@ -15,6 +15,7 @@
from oslo_utils import uuidutils
import sqlalchemy as sa
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy import orm
from neutron.api.v2 import attributes as attr
@ -205,7 +206,6 @@ class Subnet(model_base.BASEV2, HasId, HasTenant):
backref='subnet',
cascade='all, delete, delete-orphan',
lazy='joined')
shared = sa.Column(sa.Boolean)
ipv6_ra_mode = sa.Column(sa.Enum(constants.IPV6_SLAAC,
constants.DHCPV6_STATEFUL,
constants.DHCPV6_STATELESS,
@ -214,6 +214,7 @@ class Subnet(model_base.BASEV2, HasId, HasTenant):
constants.DHCPV6_STATEFUL,
constants.DHCPV6_STATELESS,
name='ipv6_address_modes'), nullable=True)
rbac_entries = association_proxy('networks', 'rbac_entries')
class SubnetPoolPrefix(model_base.BASEV2):
@ -251,10 +252,13 @@ class Network(model_base.BASEV2, HasId, HasTenant):
name = sa.Column(sa.String(attr.NAME_MAX_LEN))
ports = orm.relationship(Port, backref='networks')
subnets = orm.relationship(Subnet, backref='networks',
lazy="joined")
subnets = orm.relationship(
Subnet, backref=orm.backref('networks', lazy='joined'),
lazy="joined")
status = sa.Column(sa.String(16))
admin_state_up = sa.Column(sa.Boolean)
shared = sa.Column(sa.Boolean)
mtu = sa.Column(sa.Integer, nullable=True)
vlan_transparent = sa.Column(sa.Boolean, nullable=True)
rbac_entries = orm.relationship("NetworkRBAC", backref='network',
lazy='joined',
cascade='all, delete, delete-orphan')

View File

@ -0,0 +1,85 @@
# Copyright (c) 2015 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import sqlalchemy as sa
from sqlalchemy.orm import validates
from neutron.common import exceptions as n_exc
from neutron.db import model_base
from neutron.db import models_v2
class InvalidActionForType(n_exc.InvalidInput):
message = _("Invalid action '%(action)s' for object type "
"'%(object_type)s'. Valid actions: %(valid_actions)s")
class RBACColumns(models_v2.HasId, models_v2.HasTenant):
"""Mixin that object-specific RBAC tables should inherit.
All RBAC tables should inherit directly from this one because
the RBAC code uses the __subclasses__() method to discover the
RBAC types.
"""
# the target_tenant is the subject that the policy will affect. this may
# also be a wildcard '*' to indicate all tenants or it may be a role if
# neutron gets better integration with keystone
target_tenant = sa.Column(sa.String(255), nullable=False)
action = sa.Column(sa.String(255), nullable=False)
@abc.abstractproperty
def object_type(self):
# this determines the name that users will use in the API
# to reference the type. sub-classes should set their own
pass
__table_args__ = (
sa.UniqueConstraint('target_tenant', 'object_id', 'action'),
model_base.BASEV2.__table_args__
)
@validates('action')
def _validate_action(self, key, action):
if action not in self.get_valid_actions():
raise InvalidActionForType(
action=action, object_type=self.object_type,
valid_actions=self.get_valid_actions())
return action
@abc.abstractmethod
def get_valid_actions(self):
# object table needs to override this to return an interable
# with the valid actions rbac entries
pass
def get_type_model_map():
return {table.object_type: table for table in RBACColumns.__subclasses__()}
class NetworkRBAC(RBACColumns, model_base.BASEV2):
"""RBAC table for networks."""
object_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id', ondelete="CASCADE"),
nullable=False)
object_type = 'network'
def get_valid_actions(self):
return ('access_as_shared',)

View File

@ -846,7 +846,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
while True:
with session.begin(subtransactions=True):
record = self._get_subnet(context, id)
subnet = self._make_subnet_dict(record, None)
subnet = self._make_subnet_dict(record, None, context=context)
qry_allocated = (session.query(models_v2.IPAllocation).
filter_by(subnet_id=id).
join(models_v2.Port))

View File

@ -32,6 +32,49 @@ class SharedNetworksTest(base.BaseAdminNetworkTest):
super(SharedNetworksTest, cls).resource_setup()
cls.shared_network = cls.create_shared_network()
@test.idempotent_id('6661d219-b96d-4597-ad10-55766123421a')
def test_filtering_shared_networks(self):
# this test is necessary because the 'shared' column does not actually
# exist on networks so the filter function has to translate it into
# queries against the RBAC table
self.create_network()
self._check_shared_correct(
self.client.list_networks(shared=True)['networks'], True)
self._check_shared_correct(
self.admin_client.list_networks(shared=True)['networks'], True)
self._check_shared_correct(
self.client.list_networks(shared=False)['networks'], False)
self._check_shared_correct(
self.admin_client.list_networks(shared=False)['networks'], False)
def _check_shared_correct(self, items, shared):
self.assertNotEmpty(items)
self.assertTrue(all(n['shared'] == shared for n in items))
@test.idempotent_id('6661d219-b96d-4597-ad10-51672353421a')
def test_filtering_shared_subnets(self):
# shared subnets need to be tested because their shared status isn't
# visible as a regular API attribute and it's solely dependent on the
# parent network
reg = self.create_network()
priv = self.create_subnet(reg, client=self.client)
shared = self.create_subnet(self.shared_network,
client=self.admin_client)
self.assertIn(shared, self.client.list_subnets(shared=True)['subnets'])
self.assertIn(shared,
self.admin_client.list_subnets(shared=True)['subnets'])
self.assertNotIn(priv,
self.client.list_subnets(shared=True)['subnets'])
self.assertNotIn(priv,
self.admin_client.list_subnets(shared=True)['subnets'])
self.assertIn(priv, self.client.list_subnets(shared=False)['subnets'])
self.assertIn(priv,
self.admin_client.list_subnets(shared=False)['subnets'])
self.assertNotIn(shared,
self.client.list_subnets(shared=False)['subnets'])
self.assertNotIn(shared,
self.admin_client.list_subnets(shared=False)['subnets'])
@test.idempotent_id('6661d219-b96d-4597-ad10-55766ce4abf7')
def test_create_update_shared_network(self):
shared_network = self.create_shared_network()

View File

@ -2293,7 +2293,7 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
# must query db to see whether subnet's shared attribute
# has been updated or not
ctx = context.Context('', '', is_admin=True)
subnet_db = manager.NeutronManager.get_plugin()._get_subnet(
subnet_db = manager.NeutronManager.get_plugin().get_subnet(
ctx, subnet['subnet']['id'])
self.assertEqual(subnet_db['shared'], True)
@ -3806,13 +3806,16 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
self.subnet(network=network) as v4_subnet,\
self.port(subnet=v4_subnet, device_owner=device_owner) as port:
if insert_db_reference_error:
def db_ref_err_for_ipalloc(instance):
orig = orm.Session.add
def db_ref_err_for_ipalloc(s, instance):
if instance.__class__.__name__ == 'IPAllocation':
raise db_exc.DBReferenceError(
'dummy_table', 'dummy_constraint',
'dummy_key', 'dummy_key_table')
return orig(s, instance)
mock.patch.object(orm.Session, 'add',
side_effect=db_ref_err_for_ipalloc).start()
new=db_ref_err_for_ipalloc).start()
mock.patch.object(non_ipam.IpamNonPluggableBackend,
'_get_subnet',
return_value=mock.Mock()).start()
@ -5323,8 +5326,8 @@ class DbModelTestCase(base.BaseTestCase):
exp_middle = "[object at %x]" % id(network)
exp_end_with = (" {tenant_id=None, id=None, "
"name='net_net', status='OK', "
"admin_state_up=True, shared=None, "
"mtu=None, vlan_transparent=None}>")
"admin_state_up=True, mtu=None, "
"vlan_transparent=None}>")
final_exp = exp_start_with + exp_middle + exp_end_with
self.assertEqual(actual_repr_output, final_exp)