Use RBACBaseObject OVO in neutron/db/rbac_db_mixin.py

Implemented RBACBaseObject metaclass, inherited from NetworkRBAC
and QosPolicyRBAC.

Partially-Implements: blueprint adopt-oslo-versioned-objects-for-db

Change-Id: I07d1e0bb27e19bd19911255c069fa27a42451264
This commit is contained in:
Rodolfo Alonso Hernandez 2018-10-13 18:12:09 +01:00
parent 4f3cde2322
commit 8eee74f626
12 changed files with 320 additions and 81 deletions

@ -21,12 +21,11 @@ from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as n_exc
from oslo_db import exception as db_exc
from sqlalchemy.orm import exc
from neutron.db import _model_query as model_query
from neutron.db import common_db_mixin
from neutron.db import rbac_db_models as models
from neutron.extensions import rbac as ext_rbac
from neutron.objects import base as base_obj
from neutron.objects import rbac as rbac_obj
class RbacPluginMixin(common_db_mixin.CommonDbMixin):
@ -44,57 +43,57 @@ class RbacPluginMixin(common_db_mixin.CommonDbMixin):
policy=e)
except c_exc.CallbackFailure as e:
raise n_exc.InvalidInput(error_message=e)
dbmodel = models.get_type_model_map()[e['object_type']]
rbac_class = (
rbac_obj.RBACBaseObject.get_type_class_map()[e['object_type']])
try:
with context.session.begin(subtransactions=True):
db_entry = dbmodel(object_id=e['object_id'],
target_tenant=e['target_tenant'],
action=e['action'],
tenant_id=e['tenant_id'])
context.session.add(db_entry)
rbac_args = {'project_id': e['project_id'],
'object_id': e['object_id'],
'action': e['action'],
'target_tenant': e['target_tenant']}
_rbac_obj = rbac_class(context, **rbac_args)
_rbac_obj.create()
except db_exc.DBDuplicateEntry:
raise ext_rbac.DuplicateRbacPolicy()
return self._make_rbac_policy_dict(db_entry)
return self._make_rbac_policy_dict(_rbac_obj)
@staticmethod
def _make_rbac_policy_dict(db_entry, fields=None):
res = {f: db_entry[f] for f in ('id', 'tenant_id', 'target_tenant',
'action', 'object_id')}
res['object_type'] = db_entry.object_type
def _make_rbac_policy_dict(entry, fields=None):
res = {f: entry[f] for f in ('id', 'project_id', 'target_tenant',
'action', 'object_id')}
res['object_type'] = entry.db_model.object_type
return db_utils.resource_fields(res, fields)
@db_api.retry_if_session_inactive()
def update_rbac_policy(self, context, id, rbac_policy):
pol = rbac_policy['rbac_policy']
entry = self._get_rbac_policy(context, id)
object_type = entry['object_type']
object_type = entry.db_model.object_type
try:
registry.notify(resources.RBAC_POLICY, events.BEFORE_UPDATE, self,
context=context, policy=entry,
object_type=object_type, policy_update=pol)
except c_exc.CallbackFailure as ex:
raise ext_rbac.RbacPolicyInUse(object_id=entry['object_id'],
raise ext_rbac.RbacPolicyInUse(object_id=entry.object_id,
details=ex)
with context.session.begin(subtransactions=True):
entry.update(pol)
entry.update_fields(pol)
entry.update()
return self._make_rbac_policy_dict(entry)
@db_api.retry_if_session_inactive()
def delete_rbac_policy(self, context, id):
entry = self._get_rbac_policy(context, id)
object_type = entry['object_type']
object_type = entry.db_model.object_type
try:
registry.notify(resources.RBAC_POLICY, events.BEFORE_DELETE, self,
context=context, object_type=object_type,
policy=entry)
except c_exc.CallbackFailure as ex:
raise ext_rbac.RbacPolicyInUse(object_id=entry['object_id'],
raise ext_rbac.RbacPolicyInUse(object_id=entry.object_id,
details=ex)
# make a dict copy because deleting the entry will nullify its
# object_id link to network
entry_dict = dict(entry)
with context.session.begin(subtransactions=True):
context.session.delete(entry)
entry_dict = entry.to_dict()
entry.delete()
registry.notify(resources.RBAC_POLICY, events.AFTER_DELETE, self,
context=context, object_type=object_type,
policy=entry_dict)
@ -102,12 +101,11 @@ class RbacPluginMixin(common_db_mixin.CommonDbMixin):
def _get_rbac_policy(self, context, id):
object_type = self._get_object_type(context, id)
dbmodel = models.get_type_model_map()[object_type]
try:
return model_query.query_with_hooks(
context, dbmodel).filter(dbmodel.id == id).one()
except exc.NoResultFound:
rbac_class = rbac_obj.RBACBaseObject.get_type_class_map()[object_type]
_rbac_obj = rbac_class.get_object(context, id=id)
if not _rbac_obj:
raise ext_rbac.RbacPolicyNotFound(id=id, object_type=object_type)
return _rbac_obj
@db_api.retry_if_session_inactive()
def get_rbac_policy(self, context, id, fields=None):
@ -117,21 +115,18 @@ class RbacPluginMixin(common_db_mixin.CommonDbMixin):
@db_api.retry_if_session_inactive()
def get_rbac_policies(self, context, filters=None, fields=None,
sorts=None, limit=None, page_reverse=False):
pager = base_obj.Pager(sorts, limit, page_reverse)
filters = filters or {}
object_type_filters = filters.pop('object_type', None)
models_to_query = [
m for t, m in models.get_type_model_map().items()
if object_type_filters is None or t in object_type_filters
]
collections = [model_query.get_collection(
context, model, self._make_rbac_policy_dict,
filters=filters, fields=fields, sorts=sorts,
limit=limit, page_reverse=page_reverse)
for model in models_to_query]
# NOTE(kevinbenton): we don't have to worry about pagination,
# limits, or page_reverse currently because allow_pagination is
# set to False in 'neutron.extensions.rbac'
return [item for c in collections for item in c]
object_types = filters.pop('object_type', None)
rbac_classes_to_query = [
o for t, o in rbac_obj.RBACBaseObject.get_type_class_map().items()
if not object_types or t in object_types]
rbac_objs = []
for rbac_class in rbac_classes_to_query:
rbac_objs += rbac_class.get_objects(context, _pager=pager,
**filters)
return [self._make_rbac_policy_dict(_rbac_obj, fields)
for _rbac_obj in rbac_objs]
def _get_object_type(self, context, entry_id):
"""Scans all RBAC tables for an ID to figure out the type.
@ -141,9 +136,9 @@ class RbacPluginMixin(common_db_mixin.CommonDbMixin):
"""
if entry_id in self.object_type_cache:
return self.object_type_cache[entry_id]
for otype, model in models.get_type_model_map().items():
if (context.session.query(model.id).
filter(model.id == entry_id).first()):
for otype, rbac_class in \
rbac_obj.RBACBaseObject.get_type_class_map().items():
if rbac_class.count(context, id=entry_id):
self.object_type_cache[entry_id] = otype
return otype
raise ext_rbac.RbacPolicyNotFound(id=entry_id, object_type='unknown')

@ -72,8 +72,9 @@ class RBACColumns(model_base.HasId, model_base.HasProject):
valid_actions=self.get_valid_actions())
return action
@staticmethod
@abc.abstractmethod
def get_valid_actions(self):
def get_valid_actions():
# object table needs to override this to return an interable
# with the valid actions rbac entries
pass
@ -96,7 +97,8 @@ class NetworkRBAC(RBACColumns, model_base.BASEV2):
object_type = 'network'
revises_on_change = ('network', )
def get_valid_actions(self):
@staticmethod
def get_valid_actions():
actions = (ACCESS_SHARED,)
pl = directory.get_plugin()
if 'external-net' in pl.supported_extension_aliases:
@ -110,5 +112,6 @@ class QosPolicyRBAC(RBACColumns, model_base.BASEV2):
object_id = _object_id_column('qos_policies.id')
object_type = 'qos_policy'
def get_valid_actions(self):
@staticmethod
def get_valid_actions():
return (ACCESS_SHARED,)

@ -30,26 +30,21 @@ from neutron.objects import base
from neutron.objects import common_types
from neutron.objects.extensions import port_security as base_ps
from neutron.objects.qos import binding
from neutron.objects import rbac
from neutron.objects import rbac_db
@base.NeutronObjectRegistry.register
class NetworkRBAC(base.NeutronDbObject):
class NetworkRBAC(rbac.RBACBaseObject):
# Version 1.0: Initial version
# Version 1.1: Added 'id' and 'project_id'
# Version 1.2: Inherit from rbac.RBACBaseObject; changed 'object_id' from
# StringField to UUIDField
VERSION = '1.1'
VERSION = '1.2'
db_model = rbac_db_models.NetworkRBAC
fields = {
'id': common_types.UUIDField(),
'project_id': obj_fields.StringField(),
'object_id': obj_fields.StringField(),
'target_tenant': obj_fields.StringField(),
'action': obj_fields.StringField(),
}
def obj_make_compatible(self, primitive, target_version):
_target_version = versionutils.convert_version_to_tuple(target_version)
if _target_version < (1, 1):

@ -31,21 +31,27 @@ from neutron.objects import common_types
from neutron.objects.db import api as obj_db_api
from neutron.objects.qos import binding
from neutron.objects.qos import rule as rule_obj_impl
from neutron.objects import rbac
from neutron.objects import rbac_db
@base_db.NeutronObjectRegistry.register
class QosPolicyRBAC(base_db.NeutronDbObject):
class QosPolicyRBAC(rbac.RBACBaseObject):
# Version 1.0: Initial version
VERSION = '1.0'
# Version 1.1: Inherit from rbac_db.RBACBaseObject; added 'id' and
# 'project_id'; changed 'object_id' from StringField to
# UUIDField
VERSION = '1.1'
db_model = rbac_db_models.QosPolicyRBAC
fields = {
'object_id': obj_fields.StringField(),
'target_tenant': obj_fields.StringField(),
'action': obj_fields.StringField(),
}
def obj_make_compatible(self, primitive, target_version):
_target_version = versionutils.convert_version_to_tuple(target_version)
if _target_version < (1, 1):
standard_fields = ['id', 'project_id']
for f in standard_fields:
primitive.pop(f)
@base_db.NeutronObjectRegistry.register

62
neutron/objects/rbac.py Normal file

@ -0,0 +1,62 @@
# Copyright 2018 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo_versionedobjects import fields as obj_fields
from six import add_metaclass
from sqlalchemy import and_
from neutron.db import rbac_db_models as models
from neutron.objects import base
from neutron.objects import common_types
@add_metaclass(abc.ABCMeta)
class RBACBaseObject(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'id': common_types.UUIDField(),
'project_id': obj_fields.StringField(),
'object_id': common_types.UUIDField(),
'target_tenant': obj_fields.StringField(),
'action': obj_fields.StringField(),
}
fields_no_update = ['id', 'project_id', 'object_id']
@classmethod
def get_projects(cls, context, object_id=None, action=None,
target_tenant=None):
clauses = []
if object_id:
clauses.append(models.NetworkRBAC.object_id == object_id)
if action:
clauses.append(models.NetworkRBAC.action == action)
if target_tenant:
clauses.append(models.NetworkRBAC.target_tenant ==
target_tenant)
query = context.session.query(models.NetworkRBAC.target_tenant)
if clauses:
query = query.filter(and_(*clauses))
return [data[0] for data in query]
@classmethod
def get_type_class_map(cls):
return {klass.db_model.object_type: klass
for klass in cls.__subclasses__()}

@ -198,11 +198,11 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
return callback_map[event](resource, event, trigger, context,
object_type, policy, **kwargs)
def attach_rbac(self, obj_id, tenant_id, target_tenant='*'):
def attach_rbac(self, obj_id, project_id, target_tenant='*'):
obj_type = self.rbac_db_cls.db_model.object_type
rbac_policy = {'rbac_policy': {'object_id': obj_id,
'target_tenant': target_tenant,
'tenant_id': tenant_id,
'project_id': project_id,
'object_type': obj_type,
'action': models.ACCESS_SHARED}}
return self.create_rbac_policy(self.obj_context, rbac_policy)
@ -244,7 +244,7 @@ def _update_hook(self, update_orig):
def _create_post(self):
if self.shared:
self.attach_rbac(self.id, self.obj_context.tenant_id)
self.attach_rbac(self.id, self.project_id)
def _create_hook(self, orig_create):

@ -169,7 +169,9 @@ class QoSPlugin(qos.QoSPluginBase):
# We need to remove redundant keyword.
# This cannot be done in other place of stacktrace, because neutron
# needs to be backward compatible.
policy['policy'].pop('tenant_id', None)
tenant_id = policy['policy'].pop('tenant_id', None)
if not policy['policy'].get('project_id'):
policy['policy']['project_id'] = tenant_id
policy_obj = policy_object.QosPolicy(context, **policy['policy'])
with db_api.CONTEXT_WRITER.using(context):
policy_obj.create()

@ -18,9 +18,13 @@ import mock
from neutron_lib.callbacks import events
from neutron_lib import constants
from neutron_lib import context
from oslo_utils import uuidutils
from neutron.db.db_base_plugin_v2 import NeutronDbPluginV2 as db_plugin_v2
from neutron.db import rbac_db_models
from neutron.extensions import rbac as ext_rbac
from neutron.objects import network as network_obj
from neutron.objects.qos import policy as qos_policy_obj
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
@ -30,11 +34,12 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
super(NetworkRbacTestcase, self).setUp(plugin='ml2')
def _make_networkrbac(self, network, target, action='access_as_shared'):
policy = {'rbac_policy': {'tenant_id': network['network']['tenant_id'],
'object_id': network['network']['id'],
'object_type': 'network',
'action': action,
'target_tenant': target}}
policy = {
'rbac_policy': {'project_id': network['network']['project_id'],
'object_id': network['network']['id'],
'object_type': 'network',
'action': action,
'target_tenant': target}}
return policy
def _setup_networkrbac_and_port(self, network, target_tenant):
@ -197,7 +202,11 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
def test_delete_networkrbac_self_share(self):
net_id = 'my-network'
net_owner = 'my-tenant-id'
net = {'network': {'id': net_id, 'tenant_id': net_owner}}
# NOTE(ralonsoh): keep "tenant_id" for compatibility purposes in
# NeutronDbPluginV2.validate_network_rbac_policy_change()
net = {'network': {'id': net_id,
'tenant_id': net_owner,
'project_id': net_owner}}
policy = self._make_networkrbac(net, net_owner)['rbac_policy']
kwargs = {}
@ -213,7 +222,11 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
def test_update_self_share_networkrbac(self):
net_id = 'my-network'
net_owner = 'my-tenant-id'
net = {'network': {'id': net_id, 'tenant_id': net_owner}}
# NOTE(ralonsoh): keep "tenant_id" for compatibility purposes in
# NeutronDbPluginV2.validate_network_rbac_policy_change()
net = {'network': {'id': net_id,
'tenant_id': net_owner,
'project_id': net_owner}}
policy = self._make_networkrbac(net, net_owner)['rbac_policy']
kwargs = {'policy_update': {'target_tenant': 'new-target-tenant'}}
@ -225,3 +238,46 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
None, events.BEFORE_UPDATE, None,
self.context, 'network', policy, **kwargs)
self.assertEqual(0, ensure.call_count)
def _create_rbac_obj(self, _class):
return _class(id=uuidutils.generate_uuid(),
project_id='project_id',
object_id=uuidutils.generate_uuid(),
target_tenant='target_tenant',
action=rbac_db_models.ACCESS_SHARED)
@mock.patch.object(qos_policy_obj.QosPolicyRBAC, 'get_objects')
def test_get_rbac_policies_qos_policy(self, mock_qos_get_objects):
qos_policy_rbac = self._create_rbac_obj(qos_policy_obj.QosPolicyRBAC)
mock_qos_get_objects.return_value = [qos_policy_rbac]
filters = {'object_type': ['qos_policy']}
rbac_policies = self.plugin.get_rbac_policies(self.context, filters)
self.assertEqual(1, len(rbac_policies))
self.assertEqual(self.plugin._make_rbac_policy_dict(qos_policy_rbac),
rbac_policies[0])
@mock.patch.object(network_obj.NetworkRBAC, 'get_objects')
def test_get_rbac_policies_network(self, mock_net_get_objects):
net_rbac = self._create_rbac_obj(network_obj.NetworkRBAC)
mock_net_get_objects.return_value = [net_rbac]
filters = {'object_type': ['network']}
rbac_policies = self.plugin.get_rbac_policies(self.context, filters)
self.assertEqual(1, len(rbac_policies))
self.assertEqual(self.plugin._make_rbac_policy_dict(net_rbac),
rbac_policies[0])
@mock.patch.object(qos_policy_obj.QosPolicyRBAC, 'get_objects')
@mock.patch.object(network_obj.NetworkRBAC, 'get_objects')
def test_get_rbac_policies_all_classes(self, mock_net_get_objects,
mock_qos_get_objects):
net_rbac = self._create_rbac_obj(network_obj.NetworkRBAC)
qos_policy_rbac = self._create_rbac_obj(qos_policy_obj.QosPolicyRBAC)
mock_net_get_objects.return_value = [net_rbac]
mock_qos_get_objects.return_value = [qos_policy_rbac]
rbac_policies = self.plugin.get_rbac_policies(self.context)
self.assertEqual(2, len(rbac_policies))
rbac_policies = sorted(rbac_policies, key=lambda k: k['object_type'])
self.assertEqual(self.plugin._make_rbac_policy_dict(net_rbac),
rbac_policies[0])
self.assertEqual(self.plugin._make_rbac_policy_dict(qos_policy_rbac),
rbac_policies[1])

@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import random
import mock
from neutron_lib import constants as n_const
from neutron_lib.services.qos import constants as qos_consts
@ -35,7 +37,46 @@ RULE_OBJ_CLS = {
}
# TODO(ihrachys): add tests for QosPolicyRBAC
class _QosPolicyRBACBase(object):
def get_random_object_fields(self, obj_cls=None):
fields = (super(_QosPolicyRBACBase, self).
get_random_object_fields(obj_cls))
rnd_actions = self._test_class.db_model.get_valid_actions()
idx = random.randint(0, len(rnd_actions) - 1)
fields['action'] = rnd_actions[idx]
return fields
class QosPolicyRBACDbObjectTestCase(_QosPolicyRBACBase,
test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = policy.QosPolicyRBAC
def setUp(self):
super(QosPolicyRBACDbObjectTestCase, self).setUp()
for obj in self.db_objs:
policy_obj = policy.QosPolicy(self.context,
id=obj['object_id'],
project_id=obj['project_id'])
policy_obj.create()
def _create_test_qos_policy_rbac(self):
self.objs[0].create()
return self.objs[0]
def test_object_version_degradation_1_1_to_1_0_no_id_no_project_id(self):
qos_policy_rbac_obj = self._create_test_qos_policy_rbac()
qos_policy_rbac_dict = qos_policy_rbac_obj.obj_to_primitive('1.0')
self.assertNotIn('project_id',
qos_policy_rbac_dict['versioned_object.data'])
self.assertNotIn('id', qos_policy_rbac_dict['versioned_object.data'])
class QosPolicyRBACIfaceObjectTestCase(_QosPolicyRBACBase,
test_base.BaseObjectIfaceTestCase):
_test_class = policy.QosPolicyRBAC
class QosPolicyObjectTestCase(test_base.BaseObjectIfaceTestCase):

@ -10,8 +10,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import random
import mock
from neutron.db import rbac_db_models
from neutron.objects import base as obj_base
from neutron.objects import network
from neutron.objects.qos import binding
@ -20,7 +23,59 @@ from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit import testlib_api
# TODO(ihrachys): add tests for NetworkRBAC
class _NetworkRBACBase(object):
def get_random_object_fields(self, obj_cls=None):
fields = (super(_NetworkRBACBase, self).
get_random_object_fields(obj_cls))
rnd_actions = self._test_class.db_model.get_valid_actions()
idx = random.randint(0, len(rnd_actions) - 1)
fields['action'] = rnd_actions[idx]
return fields
class NetworkRBACDbObjectTestCase(_NetworkRBACBase,
obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = network.NetworkRBAC
def setUp(self):
self._mock_get_valid_actions = mock.patch.object(
rbac_db_models.NetworkRBAC, 'get_valid_actions',
return_value=(rbac_db_models.ACCESS_EXTERNAL,
rbac_db_models.ACCESS_SHARED))
self.mock_get_valid_actions = self._mock_get_valid_actions.start()
super(NetworkRBACDbObjectTestCase, self).setUp()
for obj in self.db_objs:
net_obj = network.Network(self.context, id=obj['object_id'])
net_obj.create()
def _create_test_network_rbac(self):
self.objs[0].create()
return self.objs[0]
def test_object_version_degradation_1_1_to_1_0_no_id_no_project_id(self):
network_rbac_obj = self._create_test_network_rbac()
network_rbac_obj = network_rbac_obj.obj_to_primitive('1.0')
self.assertNotIn('project_id',
network_rbac_obj['versioned_object.data'])
self.assertNotIn('id', network_rbac_obj['versioned_object.data'])
class NetworkRBACIfaceOjectTestCase(_NetworkRBACBase,
obj_test_base.BaseObjectIfaceTestCase):
_test_class = network.NetworkRBAC
def setUp(self):
self._mock_get_valid_actions = mock.patch.object(
rbac_db_models.NetworkRBAC, 'get_valid_actions',
return_value=(rbac_db_models.ACCESS_EXTERNAL,
rbac_db_models.ACCESS_SHARED))
self.mock_get_valid_actions = self._mock_get_valid_actions.start()
super(NetworkRBACIfaceOjectTestCase, self).setUp()
class NetworkDhcpAgentBindingObjectIfaceTestCase(
obj_test_base.BaseObjectIfaceTestCase):

@ -60,7 +60,7 @@ object_data = {
'NetworkDhcpAgentBinding': '1.0-6eeceb5fb4335cd65a305016deb41c68',
'NetworkDNSDomain': '1.0-420db7910294608534c1e2e30d6d8319',
'NetworkPortSecurity': '1.0-b30802391a87945ee9c07582b4ff95e3',
'NetworkRBAC': '1.1-7f8baaf9ea4257a26408454ad0065adb',
'NetworkRBAC': '1.2-192845c5ed0718e1c54fac36936fcd7d',
'NetworkSegment': '1.0-57b7f2960971e3b95ded20cbc59244a8',
'Port': '1.4-1b6183bccfc2cd210919a1a72faefce1',
'PortBinding': '1.0-3306deeaa6deb01e33af06777d48d578',
@ -74,7 +74,7 @@ object_data = {
'QosBandwidthLimitRule': '1.3-51b662b12a8d1dfa89288d826c6d26d3',
'QosDscpMarkingRule': '1.3-0313c6554b34fd10c753cb63d638256c',
'QosMinimumBandwidthRule': '1.3-314c3419f4799067cc31cc319080adff',
'QosPolicyRBAC': '1.0-c8a67f39809c5a3c8c7f26f2f2c620b2',
'QosPolicyRBAC': '1.1-192845c5ed0718e1c54fac36936fcd7d',
'QosRuleType': '1.3-7286188edeb3a0386f9cf7979b9700fc',
'QosRuleTypeDriver': '1.0-7d8cb9f0ef661ac03700eae97118e3db',
'QosPolicy': '1.7-4adb0cde3102c10d8970ec9487fd7fe7',

@ -0,0 +1,24 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.objects import network
from neutron.objects.qos import policy
from neutron.objects import rbac
from neutron.tests import base as neutron_test_base
class RBACBaseObjectTestCase(neutron_test_base.BaseTestCase):
def test_get_type_class_map(self):
class_map = {'qos_policy': policy.QosPolicyRBAC,
'network': network.NetworkRBAC}
self.assertEqual(class_map, rbac.RBACBaseObject.get_type_class_map())