diff --git a/neutron/db/rbac_db_mixin.py b/neutron/db/rbac_db_mixin.py index ffe8df32e64..7e9521c2684 100644 --- a/neutron/db/rbac_db_mixin.py +++ b/neutron/db/rbac_db_mixin.py @@ -21,12 +21,11 @@ from neutron_lib.db import api as db_api from neutron_lib.db import utils as db_utils from neutron_lib import exceptions as n_exc from oslo_db import exception as db_exc -from sqlalchemy.orm import exc -from neutron.db import _model_query as model_query from neutron.db import common_db_mixin -from neutron.db import rbac_db_models as models from neutron.extensions import rbac as ext_rbac +from neutron.objects import base as base_obj +from neutron.objects import rbac as rbac_obj class RbacPluginMixin(common_db_mixin.CommonDbMixin): @@ -44,57 +43,57 @@ class RbacPluginMixin(common_db_mixin.CommonDbMixin): policy=e) except c_exc.CallbackFailure as e: raise n_exc.InvalidInput(error_message=e) - dbmodel = models.get_type_model_map()[e['object_type']] + rbac_class = ( + rbac_obj.RBACBaseObject.get_type_class_map()[e['object_type']]) try: - with context.session.begin(subtransactions=True): - db_entry = dbmodel(object_id=e['object_id'], - target_tenant=e['target_tenant'], - action=e['action'], - tenant_id=e['tenant_id']) - context.session.add(db_entry) + rbac_args = {'project_id': e['project_id'], + 'object_id': e['object_id'], + 'action': e['action'], + 'target_tenant': e['target_tenant']} + _rbac_obj = rbac_class(context, **rbac_args) + _rbac_obj.create() except db_exc.DBDuplicateEntry: raise ext_rbac.DuplicateRbacPolicy() - return self._make_rbac_policy_dict(db_entry) + return self._make_rbac_policy_dict(_rbac_obj) @staticmethod - def _make_rbac_policy_dict(db_entry, fields=None): - res = {f: db_entry[f] for f in ('id', 'tenant_id', 'target_tenant', - 'action', 'object_id')} - res['object_type'] = db_entry.object_type + def _make_rbac_policy_dict(entry, fields=None): + res = {f: entry[f] for f in ('id', 'project_id', 'target_tenant', + 'action', 'object_id')} + res['object_type'] = entry.db_model.object_type return db_utils.resource_fields(res, fields) @db_api.retry_if_session_inactive() def update_rbac_policy(self, context, id, rbac_policy): pol = rbac_policy['rbac_policy'] entry = self._get_rbac_policy(context, id) - object_type = entry['object_type'] + object_type = entry.db_model.object_type try: registry.notify(resources.RBAC_POLICY, events.BEFORE_UPDATE, self, context=context, policy=entry, object_type=object_type, policy_update=pol) except c_exc.CallbackFailure as ex: - raise ext_rbac.RbacPolicyInUse(object_id=entry['object_id'], + raise ext_rbac.RbacPolicyInUse(object_id=entry.object_id, details=ex) - with context.session.begin(subtransactions=True): - entry.update(pol) + entry.update_fields(pol) + entry.update() return self._make_rbac_policy_dict(entry) @db_api.retry_if_session_inactive() def delete_rbac_policy(self, context, id): entry = self._get_rbac_policy(context, id) - object_type = entry['object_type'] + object_type = entry.db_model.object_type try: registry.notify(resources.RBAC_POLICY, events.BEFORE_DELETE, self, context=context, object_type=object_type, policy=entry) except c_exc.CallbackFailure as ex: - raise ext_rbac.RbacPolicyInUse(object_id=entry['object_id'], + raise ext_rbac.RbacPolicyInUse(object_id=entry.object_id, details=ex) # make a dict copy because deleting the entry will nullify its # object_id link to network - entry_dict = dict(entry) - with context.session.begin(subtransactions=True): - context.session.delete(entry) + entry_dict = entry.to_dict() + entry.delete() registry.notify(resources.RBAC_POLICY, events.AFTER_DELETE, self, context=context, object_type=object_type, policy=entry_dict) @@ -102,12 +101,11 @@ class RbacPluginMixin(common_db_mixin.CommonDbMixin): def _get_rbac_policy(self, context, id): object_type = self._get_object_type(context, id) - dbmodel = models.get_type_model_map()[object_type] - try: - return model_query.query_with_hooks( - context, dbmodel).filter(dbmodel.id == id).one() - except exc.NoResultFound: + rbac_class = rbac_obj.RBACBaseObject.get_type_class_map()[object_type] + _rbac_obj = rbac_class.get_object(context, id=id) + if not _rbac_obj: raise ext_rbac.RbacPolicyNotFound(id=id, object_type=object_type) + return _rbac_obj @db_api.retry_if_session_inactive() def get_rbac_policy(self, context, id, fields=None): @@ -117,21 +115,18 @@ class RbacPluginMixin(common_db_mixin.CommonDbMixin): @db_api.retry_if_session_inactive() def get_rbac_policies(self, context, filters=None, fields=None, sorts=None, limit=None, page_reverse=False): + pager = base_obj.Pager(sorts, limit, page_reverse) filters = filters or {} - object_type_filters = filters.pop('object_type', None) - models_to_query = [ - m for t, m in models.get_type_model_map().items() - if object_type_filters is None or t in object_type_filters - ] - collections = [model_query.get_collection( - context, model, self._make_rbac_policy_dict, - filters=filters, fields=fields, sorts=sorts, - limit=limit, page_reverse=page_reverse) - for model in models_to_query] - # NOTE(kevinbenton): we don't have to worry about pagination, - # limits, or page_reverse currently because allow_pagination is - # set to False in 'neutron.extensions.rbac' - return [item for c in collections for item in c] + object_types = filters.pop('object_type', None) + rbac_classes_to_query = [ + o for t, o in rbac_obj.RBACBaseObject.get_type_class_map().items() + if not object_types or t in object_types] + rbac_objs = [] + for rbac_class in rbac_classes_to_query: + rbac_objs += rbac_class.get_objects(context, _pager=pager, + **filters) + return [self._make_rbac_policy_dict(_rbac_obj, fields) + for _rbac_obj in rbac_objs] def _get_object_type(self, context, entry_id): """Scans all RBAC tables for an ID to figure out the type. @@ -141,9 +136,9 @@ class RbacPluginMixin(common_db_mixin.CommonDbMixin): """ if entry_id in self.object_type_cache: return self.object_type_cache[entry_id] - for otype, model in models.get_type_model_map().items(): - if (context.session.query(model.id). - filter(model.id == entry_id).first()): + for otype, rbac_class in \ + rbac_obj.RBACBaseObject.get_type_class_map().items(): + if rbac_class.count(context, id=entry_id): self.object_type_cache[entry_id] = otype return otype raise ext_rbac.RbacPolicyNotFound(id=entry_id, object_type='unknown') diff --git a/neutron/db/rbac_db_models.py b/neutron/db/rbac_db_models.py index 913e41d0c02..cfadc9be7e9 100644 --- a/neutron/db/rbac_db_models.py +++ b/neutron/db/rbac_db_models.py @@ -72,8 +72,9 @@ class RBACColumns(model_base.HasId, model_base.HasProject): valid_actions=self.get_valid_actions()) return action + @staticmethod @abc.abstractmethod - def get_valid_actions(self): + def get_valid_actions(): # object table needs to override this to return an interable # with the valid actions rbac entries pass @@ -96,7 +97,8 @@ class NetworkRBAC(RBACColumns, model_base.BASEV2): object_type = 'network' revises_on_change = ('network', ) - def get_valid_actions(self): + @staticmethod + def get_valid_actions(): actions = (ACCESS_SHARED,) pl = directory.get_plugin() if 'external-net' in pl.supported_extension_aliases: @@ -110,5 +112,6 @@ class QosPolicyRBAC(RBACColumns, model_base.BASEV2): object_id = _object_id_column('qos_policies.id') object_type = 'qos_policy' - def get_valid_actions(self): + @staticmethod + def get_valid_actions(): return (ACCESS_SHARED,) diff --git a/neutron/objects/network.py b/neutron/objects/network.py index 4baca1aaea2..1cbc6f4e3f5 100644 --- a/neutron/objects/network.py +++ b/neutron/objects/network.py @@ -30,26 +30,21 @@ from neutron.objects import base from neutron.objects import common_types from neutron.objects.extensions import port_security as base_ps from neutron.objects.qos import binding +from neutron.objects import rbac from neutron.objects import rbac_db @base.NeutronObjectRegistry.register -class NetworkRBAC(base.NeutronDbObject): +class NetworkRBAC(rbac.RBACBaseObject): # Version 1.0: Initial version # Version 1.1: Added 'id' and 'project_id' + # Version 1.2: Inherit from rbac.RBACBaseObject; changed 'object_id' from + # StringField to UUIDField - VERSION = '1.1' + VERSION = '1.2' db_model = rbac_db_models.NetworkRBAC - fields = { - 'id': common_types.UUIDField(), - 'project_id': obj_fields.StringField(), - 'object_id': obj_fields.StringField(), - 'target_tenant': obj_fields.StringField(), - 'action': obj_fields.StringField(), - } - def obj_make_compatible(self, primitive, target_version): _target_version = versionutils.convert_version_to_tuple(target_version) if _target_version < (1, 1): diff --git a/neutron/objects/qos/policy.py b/neutron/objects/qos/policy.py index 870f4ac860e..6508e3591e7 100644 --- a/neutron/objects/qos/policy.py +++ b/neutron/objects/qos/policy.py @@ -31,21 +31,27 @@ from neutron.objects import common_types from neutron.objects.db import api as obj_db_api from neutron.objects.qos import binding from neutron.objects.qos import rule as rule_obj_impl +from neutron.objects import rbac from neutron.objects import rbac_db @base_db.NeutronObjectRegistry.register -class QosPolicyRBAC(base_db.NeutronDbObject): +class QosPolicyRBAC(rbac.RBACBaseObject): # Version 1.0: Initial version - VERSION = '1.0' + # Version 1.1: Inherit from rbac_db.RBACBaseObject; added 'id' and + # 'project_id'; changed 'object_id' from StringField to + # UUIDField + + VERSION = '1.1' db_model = rbac_db_models.QosPolicyRBAC - fields = { - 'object_id': obj_fields.StringField(), - 'target_tenant': obj_fields.StringField(), - 'action': obj_fields.StringField(), - } + def obj_make_compatible(self, primitive, target_version): + _target_version = versionutils.convert_version_to_tuple(target_version) + if _target_version < (1, 1): + standard_fields = ['id', 'project_id'] + for f in standard_fields: + primitive.pop(f) @base_db.NeutronObjectRegistry.register diff --git a/neutron/objects/rbac.py b/neutron/objects/rbac.py new file mode 100644 index 00000000000..b70518e5629 --- /dev/null +++ b/neutron/objects/rbac.py @@ -0,0 +1,62 @@ +# Copyright 2018 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +from oslo_versionedobjects import fields as obj_fields +from six import add_metaclass +from sqlalchemy import and_ + +from neutron.db import rbac_db_models as models +from neutron.objects import base +from neutron.objects import common_types + + +@add_metaclass(abc.ABCMeta) +class RBACBaseObject(base.NeutronDbObject): + # Version 1.0: Initial version + + VERSION = '1.0' + + fields = { + 'id': common_types.UUIDField(), + 'project_id': obj_fields.StringField(), + 'object_id': common_types.UUIDField(), + 'target_tenant': obj_fields.StringField(), + 'action': obj_fields.StringField(), + } + + fields_no_update = ['id', 'project_id', 'object_id'] + + @classmethod + def get_projects(cls, context, object_id=None, action=None, + target_tenant=None): + clauses = [] + if object_id: + clauses.append(models.NetworkRBAC.object_id == object_id) + if action: + clauses.append(models.NetworkRBAC.action == action) + if target_tenant: + clauses.append(models.NetworkRBAC.target_tenant == + target_tenant) + query = context.session.query(models.NetworkRBAC.target_tenant) + if clauses: + query = query.filter(and_(*clauses)) + return [data[0] for data in query] + + @classmethod + def get_type_class_map(cls): + return {klass.db_model.object_type: klass + for klass in cls.__subclasses__()} diff --git a/neutron/objects/rbac_db.py b/neutron/objects/rbac_db.py index fd1eef90260..d17a0226038 100644 --- a/neutron/objects/rbac_db.py +++ b/neutron/objects/rbac_db.py @@ -198,11 +198,11 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin, return callback_map[event](resource, event, trigger, context, object_type, policy, **kwargs) - def attach_rbac(self, obj_id, tenant_id, target_tenant='*'): + def attach_rbac(self, obj_id, project_id, target_tenant='*'): obj_type = self.rbac_db_cls.db_model.object_type rbac_policy = {'rbac_policy': {'object_id': obj_id, 'target_tenant': target_tenant, - 'tenant_id': tenant_id, + 'project_id': project_id, 'object_type': obj_type, 'action': models.ACCESS_SHARED}} return self.create_rbac_policy(self.obj_context, rbac_policy) @@ -244,7 +244,7 @@ def _update_hook(self, update_orig): def _create_post(self): if self.shared: - self.attach_rbac(self.id, self.obj_context.tenant_id) + self.attach_rbac(self.id, self.project_id) def _create_hook(self, orig_create): diff --git a/neutron/services/qos/qos_plugin.py b/neutron/services/qos/qos_plugin.py index 3f71d787e29..25f84cb78fe 100644 --- a/neutron/services/qos/qos_plugin.py +++ b/neutron/services/qos/qos_plugin.py @@ -169,7 +169,9 @@ class QoSPlugin(qos.QoSPluginBase): # We need to remove redundant keyword. # This cannot be done in other place of stacktrace, because neutron # needs to be backward compatible. - policy['policy'].pop('tenant_id', None) + tenant_id = policy['policy'].pop('tenant_id', None) + if not policy['policy'].get('project_id'): + policy['policy']['project_id'] = tenant_id policy_obj = policy_object.QosPolicy(context, **policy['policy']) with db_api.CONTEXT_WRITER.using(context): policy_obj.create() diff --git a/neutron/tests/unit/db/test_rbac_db_mixin.py b/neutron/tests/unit/db/test_rbac_db_mixin.py index af683d8c43a..0ff279b91a0 100644 --- a/neutron/tests/unit/db/test_rbac_db_mixin.py +++ b/neutron/tests/unit/db/test_rbac_db_mixin.py @@ -18,9 +18,13 @@ import mock from neutron_lib.callbacks import events from neutron_lib import constants from neutron_lib import context +from oslo_utils import uuidutils from neutron.db.db_base_plugin_v2 import NeutronDbPluginV2 as db_plugin_v2 +from neutron.db import rbac_db_models from neutron.extensions import rbac as ext_rbac +from neutron.objects import network as network_obj +from neutron.objects.qos import policy as qos_policy_obj from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin @@ -30,11 +34,12 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase): super(NetworkRbacTestcase, self).setUp(plugin='ml2') def _make_networkrbac(self, network, target, action='access_as_shared'): - policy = {'rbac_policy': {'tenant_id': network['network']['tenant_id'], - 'object_id': network['network']['id'], - 'object_type': 'network', - 'action': action, - 'target_tenant': target}} + policy = { + 'rbac_policy': {'project_id': network['network']['project_id'], + 'object_id': network['network']['id'], + 'object_type': 'network', + 'action': action, + 'target_tenant': target}} return policy def _setup_networkrbac_and_port(self, network, target_tenant): @@ -197,7 +202,11 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase): def test_delete_networkrbac_self_share(self): net_id = 'my-network' net_owner = 'my-tenant-id' - net = {'network': {'id': net_id, 'tenant_id': net_owner}} + # NOTE(ralonsoh): keep "tenant_id" for compatibility purposes in + # NeutronDbPluginV2.validate_network_rbac_policy_change() + net = {'network': {'id': net_id, + 'tenant_id': net_owner, + 'project_id': net_owner}} policy = self._make_networkrbac(net, net_owner)['rbac_policy'] kwargs = {} @@ -213,7 +222,11 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase): def test_update_self_share_networkrbac(self): net_id = 'my-network' net_owner = 'my-tenant-id' - net = {'network': {'id': net_id, 'tenant_id': net_owner}} + # NOTE(ralonsoh): keep "tenant_id" for compatibility purposes in + # NeutronDbPluginV2.validate_network_rbac_policy_change() + net = {'network': {'id': net_id, + 'tenant_id': net_owner, + 'project_id': net_owner}} policy = self._make_networkrbac(net, net_owner)['rbac_policy'] kwargs = {'policy_update': {'target_tenant': 'new-target-tenant'}} @@ -225,3 +238,46 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase): None, events.BEFORE_UPDATE, None, self.context, 'network', policy, **kwargs) self.assertEqual(0, ensure.call_count) + + def _create_rbac_obj(self, _class): + return _class(id=uuidutils.generate_uuid(), + project_id='project_id', + object_id=uuidutils.generate_uuid(), + target_tenant='target_tenant', + action=rbac_db_models.ACCESS_SHARED) + + @mock.patch.object(qos_policy_obj.QosPolicyRBAC, 'get_objects') + def test_get_rbac_policies_qos_policy(self, mock_qos_get_objects): + qos_policy_rbac = self._create_rbac_obj(qos_policy_obj.QosPolicyRBAC) + mock_qos_get_objects.return_value = [qos_policy_rbac] + filters = {'object_type': ['qos_policy']} + rbac_policies = self.plugin.get_rbac_policies(self.context, filters) + self.assertEqual(1, len(rbac_policies)) + self.assertEqual(self.plugin._make_rbac_policy_dict(qos_policy_rbac), + rbac_policies[0]) + + @mock.patch.object(network_obj.NetworkRBAC, 'get_objects') + def test_get_rbac_policies_network(self, mock_net_get_objects): + net_rbac = self._create_rbac_obj(network_obj.NetworkRBAC) + mock_net_get_objects.return_value = [net_rbac] + filters = {'object_type': ['network']} + rbac_policies = self.plugin.get_rbac_policies(self.context, filters) + self.assertEqual(1, len(rbac_policies)) + self.assertEqual(self.plugin._make_rbac_policy_dict(net_rbac), + rbac_policies[0]) + + @mock.patch.object(qos_policy_obj.QosPolicyRBAC, 'get_objects') + @mock.patch.object(network_obj.NetworkRBAC, 'get_objects') + def test_get_rbac_policies_all_classes(self, mock_net_get_objects, + mock_qos_get_objects): + net_rbac = self._create_rbac_obj(network_obj.NetworkRBAC) + qos_policy_rbac = self._create_rbac_obj(qos_policy_obj.QosPolicyRBAC) + mock_net_get_objects.return_value = [net_rbac] + mock_qos_get_objects.return_value = [qos_policy_rbac] + rbac_policies = self.plugin.get_rbac_policies(self.context) + self.assertEqual(2, len(rbac_policies)) + rbac_policies = sorted(rbac_policies, key=lambda k: k['object_type']) + self.assertEqual(self.plugin._make_rbac_policy_dict(net_rbac), + rbac_policies[0]) + self.assertEqual(self.plugin._make_rbac_policy_dict(qos_policy_rbac), + rbac_policies[1]) diff --git a/neutron/tests/unit/objects/qos/test_policy.py b/neutron/tests/unit/objects/qos/test_policy.py index f57c499ac60..f0896b925f2 100644 --- a/neutron/tests/unit/objects/qos/test_policy.py +++ b/neutron/tests/unit/objects/qos/test_policy.py @@ -10,6 +10,8 @@ # License for the specific language governing permissions and limitations # under the License. +import random + import mock from neutron_lib import constants as n_const from neutron_lib.services.qos import constants as qos_consts @@ -35,7 +37,46 @@ RULE_OBJ_CLS = { } -# TODO(ihrachys): add tests for QosPolicyRBAC +class _QosPolicyRBACBase(object): + + def get_random_object_fields(self, obj_cls=None): + fields = (super(_QosPolicyRBACBase, self). + get_random_object_fields(obj_cls)) + rnd_actions = self._test_class.db_model.get_valid_actions() + idx = random.randint(0, len(rnd_actions) - 1) + fields['action'] = rnd_actions[idx] + return fields + + +class QosPolicyRBACDbObjectTestCase(_QosPolicyRBACBase, + test_base.BaseDbObjectTestCase, + testlib_api.SqlTestCase): + + _test_class = policy.QosPolicyRBAC + + def setUp(self): + super(QosPolicyRBACDbObjectTestCase, self).setUp() + for obj in self.db_objs: + policy_obj = policy.QosPolicy(self.context, + id=obj['object_id'], + project_id=obj['project_id']) + policy_obj.create() + + def _create_test_qos_policy_rbac(self): + self.objs[0].create() + return self.objs[0] + + def test_object_version_degradation_1_1_to_1_0_no_id_no_project_id(self): + qos_policy_rbac_obj = self._create_test_qos_policy_rbac() + qos_policy_rbac_dict = qos_policy_rbac_obj.obj_to_primitive('1.0') + self.assertNotIn('project_id', + qos_policy_rbac_dict['versioned_object.data']) + self.assertNotIn('id', qos_policy_rbac_dict['versioned_object.data']) + + +class QosPolicyRBACIfaceObjectTestCase(_QosPolicyRBACBase, + test_base.BaseObjectIfaceTestCase): + _test_class = policy.QosPolicyRBAC class QosPolicyObjectTestCase(test_base.BaseObjectIfaceTestCase): diff --git a/neutron/tests/unit/objects/test_network.py b/neutron/tests/unit/objects/test_network.py index 04ae46d3dd9..4fc1aeff728 100644 --- a/neutron/tests/unit/objects/test_network.py +++ b/neutron/tests/unit/objects/test_network.py @@ -10,8 +10,11 @@ # License for the specific language governing permissions and limitations # under the License. +import random + import mock +from neutron.db import rbac_db_models from neutron.objects import base as obj_base from neutron.objects import network from neutron.objects.qos import binding @@ -20,7 +23,59 @@ from neutron.tests.unit.objects import test_base as obj_test_base from neutron.tests.unit import testlib_api -# TODO(ihrachys): add tests for NetworkRBAC +class _NetworkRBACBase(object): + + def get_random_object_fields(self, obj_cls=None): + fields = (super(_NetworkRBACBase, self). + get_random_object_fields(obj_cls)) + rnd_actions = self._test_class.db_model.get_valid_actions() + idx = random.randint(0, len(rnd_actions) - 1) + fields['action'] = rnd_actions[idx] + return fields + + +class NetworkRBACDbObjectTestCase(_NetworkRBACBase, + obj_test_base.BaseDbObjectTestCase, + testlib_api.SqlTestCase): + + _test_class = network.NetworkRBAC + + def setUp(self): + self._mock_get_valid_actions = mock.patch.object( + rbac_db_models.NetworkRBAC, 'get_valid_actions', + return_value=(rbac_db_models.ACCESS_EXTERNAL, + rbac_db_models.ACCESS_SHARED)) + self.mock_get_valid_actions = self._mock_get_valid_actions.start() + super(NetworkRBACDbObjectTestCase, self).setUp() + for obj in self.db_objs: + net_obj = network.Network(self.context, id=obj['object_id']) + net_obj.create() + + def _create_test_network_rbac(self): + self.objs[0].create() + return self.objs[0] + + def test_object_version_degradation_1_1_to_1_0_no_id_no_project_id(self): + network_rbac_obj = self._create_test_network_rbac() + network_rbac_obj = network_rbac_obj.obj_to_primitive('1.0') + self.assertNotIn('project_id', + network_rbac_obj['versioned_object.data']) + self.assertNotIn('id', network_rbac_obj['versioned_object.data']) + + +class NetworkRBACIfaceOjectTestCase(_NetworkRBACBase, + obj_test_base.BaseObjectIfaceTestCase): + + _test_class = network.NetworkRBAC + + def setUp(self): + self._mock_get_valid_actions = mock.patch.object( + rbac_db_models.NetworkRBAC, 'get_valid_actions', + return_value=(rbac_db_models.ACCESS_EXTERNAL, + rbac_db_models.ACCESS_SHARED)) + self.mock_get_valid_actions = self._mock_get_valid_actions.start() + super(NetworkRBACIfaceOjectTestCase, self).setUp() + class NetworkDhcpAgentBindingObjectIfaceTestCase( obj_test_base.BaseObjectIfaceTestCase): diff --git a/neutron/tests/unit/objects/test_objects.py b/neutron/tests/unit/objects/test_objects.py index 2070a3a9e47..a4f5b7e81bb 100644 --- a/neutron/tests/unit/objects/test_objects.py +++ b/neutron/tests/unit/objects/test_objects.py @@ -60,7 +60,7 @@ object_data = { 'NetworkDhcpAgentBinding': '1.0-6eeceb5fb4335cd65a305016deb41c68', 'NetworkDNSDomain': '1.0-420db7910294608534c1e2e30d6d8319', 'NetworkPortSecurity': '1.0-b30802391a87945ee9c07582b4ff95e3', - 'NetworkRBAC': '1.1-7f8baaf9ea4257a26408454ad0065adb', + 'NetworkRBAC': '1.2-192845c5ed0718e1c54fac36936fcd7d', 'NetworkSegment': '1.0-57b7f2960971e3b95ded20cbc59244a8', 'Port': '1.4-1b6183bccfc2cd210919a1a72faefce1', 'PortBinding': '1.0-3306deeaa6deb01e33af06777d48d578', @@ -74,7 +74,7 @@ object_data = { 'QosBandwidthLimitRule': '1.3-51b662b12a8d1dfa89288d826c6d26d3', 'QosDscpMarkingRule': '1.3-0313c6554b34fd10c753cb63d638256c', 'QosMinimumBandwidthRule': '1.3-314c3419f4799067cc31cc319080adff', - 'QosPolicyRBAC': '1.0-c8a67f39809c5a3c8c7f26f2f2c620b2', + 'QosPolicyRBAC': '1.1-192845c5ed0718e1c54fac36936fcd7d', 'QosRuleType': '1.3-7286188edeb3a0386f9cf7979b9700fc', 'QosRuleTypeDriver': '1.0-7d8cb9f0ef661ac03700eae97118e3db', 'QosPolicy': '1.7-4adb0cde3102c10d8970ec9487fd7fe7', diff --git a/neutron/tests/unit/objects/test_rbac.py b/neutron/tests/unit/objects/test_rbac.py new file mode 100644 index 00000000000..7b19726ba24 --- /dev/null +++ b/neutron/tests/unit/objects/test_rbac.py @@ -0,0 +1,24 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.objects import network +from neutron.objects.qos import policy +from neutron.objects import rbac +from neutron.tests import base as neutron_test_base + + +class RBACBaseObjectTestCase(neutron_test_base.BaseTestCase): + + def test_get_type_class_map(self): + class_map = {'qos_policy': policy.QosPolicyRBAC, + 'network': network.NetworkRBAC} + self.assertEqual(class_map, rbac.RBACBaseObject.get_type_class_map())