Merge "Network RBAC DB setup and legacy migration"

This commit is contained in:
Jenkins 2015-07-17 16:56:28 +00:00 committed by Gerrit Code Review
commit dbe7cac34b
13 changed files with 367 additions and 39 deletions

View File

@ -16,6 +16,8 @@
import weakref
import six
from sqlalchemy import and_
from sqlalchemy import or_
from sqlalchemy import sql
from neutron.common import exceptions as n_exc
@ -98,7 +100,15 @@ class CommonDbMixin(object):
# define basic filter condition for model query
query_filter = None
if self.model_query_scope(context, model):
if hasattr(model, 'shared'):
if hasattr(model, 'rbac_entries'):
rbac_model, join_params = self._get_rbac_query_params(model)
query = query.outerjoin(*join_params)
query_filter = (
(model.tenant_id == context.tenant_id) |
((rbac_model.action == 'access_as_shared') &
((rbac_model.target_tenant == context.tenant_id) |
(rbac_model.target_tenant == '*'))))
elif hasattr(model, 'shared'):
query_filter = ((model.tenant_id == context.tenant_id) |
(model.shared == sql.true()))
else:
@ -145,15 +155,47 @@ class CommonDbMixin(object):
query = self._model_query(context, model)
return query.filter(model.id == id).one()
def _apply_filters_to_query(self, query, model, filters):
@staticmethod
def _get_rbac_query_params(model):
"""Return the class and join params for the rbac relationship."""
try:
cls = model.rbac_entries.property.mapper.class_
return (cls, (cls, ))
except AttributeError:
# an association proxy is being used (e.g. subnets
# depends on network's rbac entries)
rbac_model = (model.rbac_entries.target_class.
rbac_entries.property.mapper.class_)
return (rbac_model, model.rbac_entries.attr)
def _apply_filters_to_query(self, query, model, filters, context=None):
if filters:
for key, value in six.iteritems(filters):
column = getattr(model, key, None)
if column:
# NOTE(kevinbenton): if column is a hybrid property that
# references another expression, attempting to convert to
# a boolean will fail so we must compare to None.
# See "An Important Expression Language Gotcha" in:
# docs.sqlalchemy.org/en/rel_0_9/changelog/migration_06.html
if column is not None:
if not value:
query = query.filter(sql.false())
return query
query = query.filter(column.in_(value))
elif key == 'shared' and hasattr(model, 'rbac_entries'):
# translate a filter on shared into a query against the
# object's rbac entries
rbac, join_params = self._get_rbac_query_params(model)
query = query.outerjoin(*join_params, aliased=True)
matches = [rbac.target_tenant == '*']
if context:
matches.append(rbac.target_tenant == context.tenant_id)
is_shared = and_(
~rbac.object_id.is_(None),
rbac.action == 'access_as_shared',
or_(*matches)
)
query = query.filter(is_shared if value[0] else ~is_shared)
for _nam, hooks in six.iteritems(self._model_query_hooks.get(model,
{})):
result_filter = hooks.get('result_filters', None)
@ -181,7 +223,8 @@ class CommonDbMixin(object):
sorts=None, limit=None, marker_obj=None,
page_reverse=False):
collection = self._model_query(context, model)
collection = self._apply_filters_to_query(collection, model, filters)
collection = self._apply_filters_to_query(collection, model, filters,
context)
if limit and page_reverse and sorts:
sorts = [(s[0], not s[1]) for s in sorts]
collection = sqlalchemyutils.paginate_query(collection, model, limit,

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
from oslo_config import cfg
from oslo_log import log as logging
from sqlalchemy.orm import exc
@ -72,7 +74,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
)
context.session.add(allocated)
def _make_subnet_dict(self, subnet, fields=None):
def _make_subnet_dict(self, subnet, fields=None, context=None):
res = {'id': subnet['id'],
'name': subnet['name'],
'tenant_id': subnet['tenant_id'],
@ -92,8 +94,10 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
'host_routes': [{'destination': route['destination'],
'nexthop': route['nexthop']}
for route in subnet['routes']],
'shared': subnet['shared']
}
# The shared attribute for a subnet is the same as its parent network
res['shared'] = self._make_network_dict(subnet.networks,
context=context)['shared']
# Call auxiliary extend functions, if any
self._apply_dict_extend_functions(attributes.SUBNETS, res, subnet)
return self._fields(res, fields)
@ -196,8 +200,10 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'subnet', limit, marker)
make_subnet_dict = functools.partial(self._make_subnet_dict,
context=context)
return self._get_collection(context, models_v2.Subnet,
self._make_subnet_dict,
make_subnet_dict,
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
@ -205,16 +211,24 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
page_reverse=page_reverse)
def _make_network_dict(self, network, fields=None,
process_extensions=True):
process_extensions=True, context=None):
res = {'id': network['id'],
'name': network['name'],
'tenant_id': network['tenant_id'],
'admin_state_up': network['admin_state_up'],
'mtu': network.get('mtu', constants.DEFAULT_NETWORK_MTU),
'status': network['status'],
'shared': network['shared'],
'subnets': [subnet['id']
for subnet in network['subnets']]}
# The shared attribute for a network now reflects if the network
# is shared to the calling tenant via an RBAC entry.
shared = False
for entry in network.rbac_entries:
if (entry.action == 'access_as_shared' and
entry.target_tenant in ('*', context.tenant_id)):
shared = True
break
res['shared'] = shared
# TODO(pritesh): Move vlan_transparent to the extension module.
# vlan_transparent here is only added if the vlantransparent
# extension is enabled.
@ -227,8 +241,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
attributes.NETWORKS, res, network)
return self._fields(res, fields)
def _make_subnet_args(self, shared, detail,
subnet, subnetpool_id):
def _make_subnet_args(self, detail, subnet, subnetpool_id):
gateway_ip = str(detail.gateway_ip) if detail.gateway_ip else None
args = {'tenant_id': detail.tenant_id,
'id': detail.subnet_id,
@ -238,8 +251,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
'cidr': str(detail.subnet_cidr),
'subnetpool_id': subnetpool_id,
'enable_dhcp': subnet['enable_dhcp'],
'gateway_ip': gateway_ip,
'shared': shared}
'gateway_ip': gateway_ip}
if subnet['ip_version'] == 6 and subnet['enable_dhcp']:
if attributes.is_attr_set(subnet['ipv6_ra_mode']):
args['ipv6_ra_mode'] = subnet['ipv6_ra_mode']

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import netaddr
from oslo_config import cfg
from oslo_db import exception as db_exc
@ -35,6 +37,7 @@ from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.db import ipam_non_pluggable_backend
from neutron.db import models_v2
from neutron.db import rbac_db_models as rbac_db
from neutron.db import sqlalchemyutils
from neutron.extensions import l3
from neutron.i18n import _LE, _LI
@ -235,7 +238,6 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
'name': n['name'],
'admin_state_up': n['admin_state_up'],
'mtu': n.get('mtu', constants.DEFAULT_NETWORK_MTU),
'shared': n['shared'],
'status': n.get('status', constants.NET_STATUS_ACTIVE)}
# TODO(pritesh): Move vlan_transparent to the extension module.
# vlan_transparent here is only added if the vlantransparent
@ -244,8 +246,14 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
attributes.ATTR_NOT_SPECIFIED):
args['vlan_transparent'] = n['vlan_transparent']
network = models_v2.Network(**args)
if n['shared']:
entry = rbac_db.NetworkRBAC(
network=network, action='access_as_shared',
target_tenant='*', tenant_id=network['tenant_id'])
context.session.add(entry)
context.session.add(network)
return self._make_network_dict(network, process_extensions=False)
return self._make_network_dict(network, process_extensions=False,
context=context)
def update_network(self, context, id, network):
n = network['network']
@ -253,13 +261,25 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
network = self._get_network(context, id)
# validate 'shared' parameter
if 'shared' in n:
entry = None
for item in network.rbac_entries:
if (item.action == 'access_as_shared' and
item.target_tenant == '*'):
entry = item
break
setattr(network, 'shared', True if entry else False)
self._validate_shared_update(context, id, network, n)
update_shared = n.pop('shared')
if update_shared and not entry:
entry = rbac_db.NetworkRBAC(
network=network, action='access_as_shared',
target_tenant='*', tenant_id=network['tenant_id'])
context.session.add(entry)
elif not update_shared and entry:
context.session.delete(entry)
context.session.expire(network, ['rbac_entries'])
network.update(n)
# also update shared in all the subnets for this network
subnets = self._get_subnets_by_network(context, id)
for subnet in subnets:
subnet['shared'] = network['shared']
return self._make_network_dict(network)
return self._make_network_dict(network, context=context)
def delete_network(self, context, id):
with context.session.begin(subtransactions=True):
@ -285,14 +305,16 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
def get_network(self, context, id, fields=None):
network = self._get_network(context, id)
return self._make_network_dict(network, fields)
return self._make_network_dict(network, fields, context=context)
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = self._get_marker_obj(context, 'network', limit, marker)
make_network_dict = functools.partial(self._make_network_dict,
context=context)
return self._get_collection(context, models_v2.Network,
self._make_network_dict,
make_network_dict,
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
@ -460,7 +482,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# internal ports on the network with addresses for this subnet.
if ipv6_utils.is_auto_address_subnet(subnet):
self.ipam.add_auto_addrs_on_network_ports(context, subnet)
return self._make_subnet_dict(subnet)
return self._make_subnet_dict(subnet, context=context)
def _get_subnetpool_id(self, subnet):
"""Returns the subnetpool id for this request
@ -554,7 +576,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
with context.session.begin(subtransactions=True):
subnet, changes = self.ipam.update_db_subnet(context, id, s)
result = self._make_subnet_dict(subnet)
result = self._make_subnet_dict(subnet, context=context)
# Keep up with fields that changed
result.update(changes)
return result
@ -634,7 +656,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
def get_subnet(self, context, id, fields=None):
subnet = self._get_subnet(context, id)
return self._make_subnet_dict(subnet, fields)
return self._make_subnet_dict(subnet, fields, context=context)
def get_subnets(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
@ -914,7 +936,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if subnet_ids:
query = query.filter(IPAllocation.subnet_id.in_(subnet_ids))
query = self._apply_filters_to_query(query, Port, filters)
query = self._apply_filters_to_query(query, Port, filters, context)
if limit and page_reverse and sorts:
sorts = [(s[0], not s[1]) for s in sorts]
query = sqlalchemyutils.paginate_query(query, Port, limit,

View File

@ -499,7 +499,6 @@ class IpamNonPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
subnet = self._save_subnet(context,
network,
self._make_subnet_args(
network.shared,
subnet_request,
subnet,
subnetpool_id),

View File

@ -1,3 +1,3 @@
30018084ec99
313373c0ffee
4ffceebfada
8675309a5c4f
kilo

View File

@ -0,0 +1,69 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""network_rbac
Revision ID: 4ffceebfada
Revises: 30018084ec99
Create Date: 2015-06-14 13:12:04.012457
"""
# revision identifiers, used by Alembic.
revision = '4ffceebfada'
down_revision = '30018084ec99'
depends_on = ('8675309a5c4f',)
from alembic import op
from oslo_utils import uuidutils
import sqlalchemy as sa
# A simple model of the networks table with only the fields needed for
# the migration.
network = sa.Table('networks', sa.MetaData(),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=255)),
sa.Column('shared', sa.Boolean(), nullable=False))
networkrbacs = sa.Table(
'networkrbacs', sa.MetaData(),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('object_id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=255), nullable=True,
index=True),
sa.Column('target_tenant', sa.String(length=255), nullable=False),
sa.Column('action', sa.String(length=255), nullable=False))
def upgrade():
op.bulk_insert(networkrbacs, get_values())
op.drop_column('networks', 'shared')
# the shared column on subnets was just an internal representation of the
# shared status of the network it was related to. This is now handled by
# other logic so we just drop it.
op.drop_column('subnets', 'shared')
def get_values():
session = sa.orm.Session(bind=op.get_bind())
values = []
for row in session.query(network).filter(network.c.shared).all():
values.append({'id': uuidutils.generate_uuid(), 'object_id': row[0],
'tenant_id': row[1], 'target_tenant': '*',
'action': 'access_as_shared'})
# this commit appears to be necessary to allow further operations
session.commit()
return values

View File

@ -0,0 +1,47 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""network_rbac
Revision ID: 8675309a5c4f
Revises: 313373c0ffee
Create Date: 2015-06-14 13:12:04.012457
"""
# revision identifiers, used by Alembic.
revision = '8675309a5c4f'
down_revision = '313373c0ffee'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'networkrbacs',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('object_id', sa.String(length=36), nullable=False),
sa.Column('tenant_id', sa.String(length=255), nullable=True,
index=True),
sa.Column('target_tenant', sa.String(length=255), nullable=False),
sa.Column('action', sa.String(length=255), nullable=False),
sa.ForeignKeyConstraint(['object_id'],
['networks.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint(
'action', 'object_id', 'target_tenant',
name='uniq_networkrbacs0tenant_target0object_id0action'))

View File

@ -41,6 +41,7 @@ from neutron.db import models_v2 # noqa
from neutron.db import portbindings_db # noqa
from neutron.db import portsecurity_db # noqa
from neutron.db import quota_db # noqa
from neutron.db import rbac_db_models # noqa
from neutron.db import securitygroups_db # noqa
from neutron.db import servicetype_db # noqa
from neutron.ipam.drivers.neutrondb_ipam import db_models # noqa

View File

@ -15,6 +15,7 @@
from oslo_utils import uuidutils
import sqlalchemy as sa
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy import orm
from neutron.api.v2 import attributes as attr
@ -205,7 +206,6 @@ class Subnet(model_base.BASEV2, HasId, HasTenant):
backref='subnet',
cascade='all, delete, delete-orphan',
lazy='joined')
shared = sa.Column(sa.Boolean)
ipv6_ra_mode = sa.Column(sa.Enum(constants.IPV6_SLAAC,
constants.DHCPV6_STATEFUL,
constants.DHCPV6_STATELESS,
@ -214,6 +214,7 @@ class Subnet(model_base.BASEV2, HasId, HasTenant):
constants.DHCPV6_STATEFUL,
constants.DHCPV6_STATELESS,
name='ipv6_address_modes'), nullable=True)
rbac_entries = association_proxy('networks', 'rbac_entries')
class SubnetPoolPrefix(model_base.BASEV2):
@ -251,10 +252,13 @@ class Network(model_base.BASEV2, HasId, HasTenant):
name = sa.Column(sa.String(attr.NAME_MAX_LEN))
ports = orm.relationship(Port, backref='networks')
subnets = orm.relationship(Subnet, backref='networks',
lazy="joined")
subnets = orm.relationship(
Subnet, backref=orm.backref('networks', lazy='joined'),
lazy="joined")
status = sa.Column(sa.String(16))
admin_state_up = sa.Column(sa.Boolean)
shared = sa.Column(sa.Boolean)
mtu = sa.Column(sa.Integer, nullable=True)
vlan_transparent = sa.Column(sa.Boolean, nullable=True)
rbac_entries = orm.relationship("NetworkRBAC", backref='network',
lazy='joined',
cascade='all, delete, delete-orphan')

View File

@ -0,0 +1,85 @@
# Copyright (c) 2015 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import sqlalchemy as sa
from sqlalchemy.orm import validates
from neutron.common import exceptions as n_exc
from neutron.db import model_base
from neutron.db import models_v2
class InvalidActionForType(n_exc.InvalidInput):
message = _("Invalid action '%(action)s' for object type "
"'%(object_type)s'. Valid actions: %(valid_actions)s")
class RBACColumns(models_v2.HasId, models_v2.HasTenant):
"""Mixin that object-specific RBAC tables should inherit.
All RBAC tables should inherit directly from this one because
the RBAC code uses the __subclasses__() method to discover the
RBAC types.
"""
# the target_tenant is the subject that the policy will affect. this may
# also be a wildcard '*' to indicate all tenants or it may be a role if
# neutron gets better integration with keystone
target_tenant = sa.Column(sa.String(255), nullable=False)
action = sa.Column(sa.String(255), nullable=False)
@abc.abstractproperty
def object_type(self):
# this determines the name that users will use in the API
# to reference the type. sub-classes should set their own
pass
__table_args__ = (
sa.UniqueConstraint('target_tenant', 'object_id', 'action'),
model_base.BASEV2.__table_args__
)
@validates('action')
def _validate_action(self, key, action):
if action not in self.get_valid_actions():
raise InvalidActionForType(
action=action, object_type=self.object_type,
valid_actions=self.get_valid_actions())
return action
@abc.abstractmethod
def get_valid_actions(self):
# object table needs to override this to return an interable
# with the valid actions rbac entries
pass
def get_type_model_map():
return {table.object_type: table for table in RBACColumns.__subclasses__()}
class NetworkRBAC(RBACColumns, model_base.BASEV2):
"""RBAC table for networks."""
object_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id', ondelete="CASCADE"),
nullable=False)
object_type = 'network'
def get_valid_actions(self):
return ('access_as_shared',)

View File

@ -846,7 +846,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
while True:
with session.begin(subtransactions=True):
record = self._get_subnet(context, id)
subnet = self._make_subnet_dict(record, None)
subnet = self._make_subnet_dict(record, None, context=context)
qry_allocated = (session.query(models_v2.IPAllocation).
filter_by(subnet_id=id).
join(models_v2.Port))

View File

@ -32,6 +32,49 @@ class SharedNetworksTest(base.BaseAdminNetworkTest):
super(SharedNetworksTest, cls).resource_setup()
cls.shared_network = cls.create_shared_network()
@test.idempotent_id('6661d219-b96d-4597-ad10-55766123421a')
def test_filtering_shared_networks(self):
# this test is necessary because the 'shared' column does not actually
# exist on networks so the filter function has to translate it into
# queries against the RBAC table
self.create_network()
self._check_shared_correct(
self.client.list_networks(shared=True)['networks'], True)
self._check_shared_correct(
self.admin_client.list_networks(shared=True)['networks'], True)
self._check_shared_correct(
self.client.list_networks(shared=False)['networks'], False)
self._check_shared_correct(
self.admin_client.list_networks(shared=False)['networks'], False)
def _check_shared_correct(self, items, shared):
self.assertNotEmpty(items)
self.assertTrue(all(n['shared'] == shared for n in items))
@test.idempotent_id('6661d219-b96d-4597-ad10-51672353421a')
def test_filtering_shared_subnets(self):
# shared subnets need to be tested because their shared status isn't
# visible as a regular API attribute and it's solely dependent on the
# parent network
reg = self.create_network()
priv = self.create_subnet(reg, client=self.client)
shared = self.create_subnet(self.shared_network,
client=self.admin_client)
self.assertIn(shared, self.client.list_subnets(shared=True)['subnets'])
self.assertIn(shared,
self.admin_client.list_subnets(shared=True)['subnets'])
self.assertNotIn(priv,
self.client.list_subnets(shared=True)['subnets'])
self.assertNotIn(priv,
self.admin_client.list_subnets(shared=True)['subnets'])
self.assertIn(priv, self.client.list_subnets(shared=False)['subnets'])
self.assertIn(priv,
self.admin_client.list_subnets(shared=False)['subnets'])
self.assertNotIn(shared,
self.client.list_subnets(shared=False)['subnets'])
self.assertNotIn(shared,
self.admin_client.list_subnets(shared=False)['subnets'])
@test.idempotent_id('6661d219-b96d-4597-ad10-55766ce4abf7')
def test_create_update_shared_network(self):
shared_network = self.create_shared_network()

View File

@ -2293,7 +2293,7 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
# must query db to see whether subnet's shared attribute
# has been updated or not
ctx = context.Context('', '', is_admin=True)
subnet_db = manager.NeutronManager.get_plugin()._get_subnet(
subnet_db = manager.NeutronManager.get_plugin().get_subnet(
ctx, subnet['subnet']['id'])
self.assertEqual(subnet_db['shared'], True)
@ -3806,13 +3806,16 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
self.subnet(network=network) as v4_subnet,\
self.port(subnet=v4_subnet, device_owner=device_owner) as port:
if insert_db_reference_error:
def db_ref_err_for_ipalloc(instance):
orig = orm.Session.add
def db_ref_err_for_ipalloc(s, instance):
if instance.__class__.__name__ == 'IPAllocation':
raise db_exc.DBReferenceError(
'dummy_table', 'dummy_constraint',
'dummy_key', 'dummy_key_table')
return orig(s, instance)
mock.patch.object(orm.Session, 'add',
side_effect=db_ref_err_for_ipalloc).start()
new=db_ref_err_for_ipalloc).start()
mock.patch.object(non_ipam.IpamNonPluggableBackend,
'_get_subnet',
return_value=mock.Mock()).start()
@ -5323,8 +5326,8 @@ class DbModelTestCase(base.BaseTestCase):
exp_middle = "[object at %x]" % id(network)
exp_end_with = (" {tenant_id=None, id=None, "
"name='net_net', status='OK', "
"admin_state_up=True, shared=None, "
"mtu=None, vlan_transparent=None}>")
"admin_state_up=True, mtu=None, "
"vlan_transparent=None}>")
final_exp = exp_start_with + exp_middle + exp_end_with
self.assertEqual(actual_repr_output, final_exp)