Replace "tenant_id" with "project_id" in OVO base

This is part of the remaining technical debt of the specs
https://specs.openstack.org/openstack/neutron-specs/specs/newton/moving-to-keystone-v3.html

Blueprint: https://blueprints.launchpad.net/neutron/+spec/keystone-v3

Change-Id: I714d97449c41c9dd889d1842c0fa9b78ffa0f9f2
This commit is contained in:
Rodolfo Alonso Hernandez 2021-10-27 16:02:50 +00:00
parent cee0653145
commit 59b2ac0c2a
18 changed files with 136 additions and 129 deletions

View File

@ -970,7 +970,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
g.id for g in sg_objs
if (context.is_admin or not tenant_id or
g.tenant_id == tenant_id or
sg_obj.SecurityGroup.is_shared_with_tenant(
sg_obj.SecurityGroup.is_shared_with_project(
context, g.id, tenant_id))
)

View File

@ -63,10 +63,10 @@ class AddressGroup(rbac_db.NeutronRbacObject):
primitive.pop('shared', None)
@classmethod
def get_bound_tenant_ids(cls, context, obj_id):
def get_bound_project_ids(cls, context, obj_id):
ag_objs = securitygroup.SecurityGroupRule.get_objects(
context, remote_address_group_id=[obj_id])
return {ag.tenant_id for ag in ag_objs}
return {ag.project_id for ag in ag_objs}
@base.NeutronObjectRegistry.register

View File

@ -69,7 +69,7 @@ class AddressScope(rbac_db.NeutronRbacObject):
return None
@classmethod
def get_bound_tenant_ids(cls, context, obj_id):
def get_bound_project_ids(cls, context, obj_id):
snp_objs = subnetpool.SubnetPool.get_objects(
context, address_scope_id=obj_id, fields=['project_id'])
return {snp['project_id'] for snp in snp_objs}

View File

@ -174,6 +174,7 @@ class NeutronObject(obj_base.VersionedObject,
# is included in self.items()
if name in self.fields and name not in self.synthetic_fields:
value = self.fields[name].to_primitive(self, name, value)
# TODO(ralonsoh): remove once bp/keystone-v3 migration finishes.
if name == 'tenant_id':
if ('project_id' in self.fields and
not self.obj_attr_is_set('project_id')):
@ -343,6 +344,7 @@ class DeclarativeObject(abc.ABCMeta):
def __init__(cls, name, bases, dct):
super(DeclarativeObject, cls).__init__(name, bases, dct)
# TODO(ralonsoh): remove once bp/keystone-v3 migration finishes.
if 'project_id' in cls.fields:
obj_extra_fields_set = set(cls.obj_extra_fields)
obj_extra_fields_set.add('tenant_id')
@ -389,6 +391,7 @@ class DeclarativeObject(abc.ABCMeta):
standardattributes.add_tag_filter_names(cls)
# Instantiate extra filters per class
cls.extra_filter_names = set(cls.extra_filter_names)
# TODO(ralonsoh): remove once bp/keystone-v3 migration finishes.
# add tenant_id filter for objects that have project_id
if 'project_id' in cls.fields and 'tenant_id' not in cls.fields:
cls.extra_filter_names.add('tenant_id')
@ -750,7 +753,7 @@ class NeutronDbObject(NeutronObject, metaclass=DeclarativeObject):
@classmethod
def is_accessible(cls, context, db_obj):
return (context.is_admin or
context.tenant_id == db_obj.tenant_id)
context.project_id == db_obj.project_id)
@staticmethod
def filter_to_str(value):

View File

@ -326,7 +326,7 @@ class Network(rbac_db.NeutronRbacObject):
self.obj_reset_changes(['qos_policy_id'])
@classmethod
def get_bound_tenant_ids(cls, context, policy_id):
def get_bound_project_ids(cls, context, policy_id):
# TODO(ihrachys): provide actual implementation
return set()

View File

@ -77,9 +77,8 @@ class NetworkSegmentRange(base.NeutronDbObject):
# fields
_dict.update({'available': self._get_available_allocation()})
_dict.update({'used': self._get_used_allocation_mapping()})
# NOTE(ralonsoh): this workaround should be removed once the migration
# from "tenant_id" to "project_id" is finished.
_dict = db_utils.resource_fields(_dict, fields)
# TODO(ralonsoh): remove once bp/keystone-v3 migration finishes.
_dict.pop('tenant_id', None)
resource_extend.apply_funcs(nsr_def.COLLECTION_NAME, _dict,
self.db_obj)

View File

@ -341,18 +341,18 @@ class QosPolicy(rbac_db.NeutronRbacObject):
self.obj_context, policy_id=self.id)
@classmethod
def _get_bound_tenant_ids(cls, session, binding_db, bound_db,
def _get_bound_project_ids(cls, session, binding_db, bound_db,
binding_db_id_column, policy_id):
return list(itertools.chain.from_iterable(
session.query(bound_db.tenant_id).join(
session.query(bound_db.project_id).join(
binding_db, bound_db.id == binding_db_id_column).filter(
binding_db.policy_id == policy_id).all()))
@classmethod
def get_bound_tenant_ids(cls, context, policy_id):
"""Implements RbacNeutronObject.get_bound_tenant_ids.
def get_bound_project_ids(cls, context, policy_id):
"""Implements RbacNeutronObject.get_bound_project_ids.
:returns: set -- a set of tenants' ids dependent on QosPolicy.
:returns: set -- a set of projects' ids dependent on QosPolicy.
"""
net = models_v2.Network
qosnet = qos_db_model.QosNetworkPolicyBinding
@ -362,20 +362,20 @@ class QosPolicy(rbac_db.NeutronRbacObject):
qosfip = qos_db_model.QosFIPPolicyBinding
router = l3.Router
qosrouter = qos_db_model.QosRouterGatewayIPPolicyBinding
bound_tenants = []
bound_projects = []
with cls.db_context_reader(context):
bound_tenants.extend(cls._get_bound_tenant_ids(
bound_projects.extend(cls._get_bound_project_ids(
context.session, qosnet, net, qosnet.network_id, policy_id))
bound_tenants.extend(
cls._get_bound_tenant_ids(context.session, qosport, port,
qosport.port_id, policy_id))
bound_tenants.extend(
cls._get_bound_tenant_ids(context.session, qosfip, fip,
qosfip.fip_id, policy_id))
bound_tenants.extend(
cls._get_bound_tenant_ids(context.session, qosrouter, router,
qosrouter.router_id, policy_id))
return set(bound_tenants)
bound_projects.extend(
cls._get_bound_project_ids(context.session, qosport, port,
qosport.port_id, policy_id))
bound_projects.extend(
cls._get_bound_project_ids(context.session, qosfip, fip,
qosfip.fip_id, policy_id))
bound_projects.extend(
cls._get_bound_project_ids(context.session, qosrouter, router,
qosrouter.router_id, policy_id))
return set(bound_projects)
def obj_make_compatible(self, primitive, target_version):
def filter_rules(obj_names, rules):

View File

@ -27,6 +27,7 @@ class RBACBaseObject(base.NeutronDbObject, metaclass=abc.ABCMeta):
VERSION = '1.0'
# TODO(ralonsoh): move 'target_tenant' to 'target_project'.
fields = {
'id': common_types.UUIDField(),
'project_id': obj_fields.StringField(),

View File

@ -39,23 +39,23 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
@classmethod
@abc.abstractmethod
def get_bound_tenant_ids(cls, context, obj_id):
"""Returns ids of all tenants depending on this db object.
def get_bound_project_ids(cls, context, obj_id):
"""Returns ids of all projects depending on this db object.
Has to be implemented by classes using RbacNeutronMetaclass.
The tenants are the ones that need the sharing or 'visibility' of the
object to them. E.g: for QosPolicy that would be the tenants using the
The projects are the ones that need the sharing or 'visibility' of the
object to them. E.g: for QosPolicy that would be the projects using the
Networks and Ports with the shared QosPolicy applied to them.
:returns: set -- a set of tenants' ids dependent on this object.
:returns: set -- a set of projects' ids dependent on this object.
"""
@staticmethod
def is_network_shared(context, rbac_entries):
# NOTE(korzen) this method is copied from db_base_plugin_common.
# The shared attribute for a network now reflects if the network
# is shared to the calling tenant via an RBAC entry.
matches = ('*',) + ((context.tenant_id,) if context else ())
# is shared to the calling project via an RBAC entry.
matches = ('*',) + ((context.project_id,) if context else ())
for entry in rbac_entries:
if (entry.action == models.ACCESS_SHARED and
entry.target_tenant in matches):
@ -63,7 +63,7 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
return False
@staticmethod
def get_shared_with_tenant(context, rbac_db_cls, obj_id, tenant_id):
def get_shared_with_project(context, rbac_db_cls, obj_id, project_id):
# NOTE(korzen) This method enables to query within already started
# session
rbac_db_model = rbac_db_cls.db_model
@ -71,21 +71,21 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
and_(rbac_db_model.object_id == obj_id,
rbac_db_model.action == models.ACCESS_SHARED,
rbac_db_model.target_tenant.in_(
['*', tenant_id]))).count() != 0)
['*', project_id]))).count() != 0)
@classmethod
def is_shared_with_tenant(cls, context, obj_id, tenant_id):
def is_shared_with_project(cls, context, obj_id, project_id):
ctx = context.elevated()
with cls.db_context_reader(ctx):
return cls.get_shared_with_tenant(ctx, cls.rbac_db_cls,
obj_id, tenant_id)
return cls.get_shared_with_project(ctx, cls.rbac_db_cls,
obj_id, project_id)
@classmethod
def is_accessible(cls, context, db_obj):
return (super(
RbacNeutronDbObjectMixin, cls).is_accessible(context, db_obj) or
cls.is_shared_with_tenant(context, db_obj.id,
context.tenant_id))
cls.is_shared_with_project(context, db_obj.id,
context.project_id))
@classmethod
def _get_db_obj_rbac_entries(cls, context, rbac_obj_id, rbac_action):
@ -95,7 +95,7 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
rbac_db_model.action == rbac_action))
@classmethod
def _get_tenants_with_shared_access_to_db_obj(cls, context, obj_id):
def _get_projects_with_shared_access_to_db_obj(cls, context, obj_id):
rbac_db_model = cls.rbac_db_cls.db_model
return set(itertools.chain.from_iterable(context.session.query(
rbac_db_model.target_tenant).filter(
@ -107,14 +107,14 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
def _validate_rbac_policy_delete(cls, context, obj_id, target_tenant):
ctx_admin = context.elevated()
rb_model = cls.rbac_db_cls.db_model
bound_tenant_ids = cls.get_bound_tenant_ids(ctx_admin, obj_id)
bound_project_ids = cls.get_bound_project_ids(ctx_admin, obj_id)
db_obj_sharing_entries = cls._get_db_obj_rbac_entries(
ctx_admin, obj_id, models.ACCESS_SHARED)
def raise_policy_in_use():
raise ext_rbac.RbacPolicyInUse(
object_id=obj_id,
details='tenant_id={}'.format(target_tenant))
details='project_id={}'.format(target_tenant))
if target_tenant != '*':
# if there is a wildcard rule, we can return early because it
@ -123,15 +123,15 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
rb_model.target_tenant == '*')
if wildcard_sharing_entries.count():
return
if target_tenant in bound_tenant_ids:
if target_tenant in bound_project_ids:
raise_policy_in_use()
return
# for the wildcard we need to query all of the rbac entries to
# see if any allow the object sharing
other_target_tenants = cls._get_tenants_with_shared_access_to_db_obj(
other_target_tenants = cls._get_projects_with_shared_access_to_db_obj(
ctx_admin, obj_id)
if not bound_tenant_ids.issubset(other_target_tenants):
if not bound_project_ids.issubset(other_target_tenants):
raise_policy_in_use()
@classmethod
@ -149,7 +149,7 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
target_tenant = policy['target_tenant']
db_obj = obj_db_api.get_object(
cls, context.elevated(), id=policy['object_id'])
if db_obj.tenant_id == target_tenant:
if db_obj.project_id == target_tenant:
return
cls._validate_rbac_policy_delete(context=context,
obj_id=policy['object_id'],
@ -171,11 +171,11 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
"""
policy = payload.latest_state
prev_tenant = policy['target_tenant']
new_tenant = payload.request_body['target_tenant']
if prev_tenant == new_tenant:
prev_project = policy['target_tenant']
new_project = payload.request_body['target_tenant']
if prev_project == new_project:
return
if new_tenant != '*':
if new_project != '*':
return cls.validate_rbac_policy_delete(
resource, event, trigger, payload=payload)
@ -203,7 +203,7 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
cls, context.elevated(), id=policy['object_id'])
if event in (events.BEFORE_CREATE, events.BEFORE_UPDATE):
if (not context.is_admin and
db_obj['tenant_id'] != context.tenant_id):
db_obj['project_id'] != context.project_id):
msg = _("Only admins can manipulate policies on objects "
"they do not own")
raise exceptions.InvalidInput(error_message=msg)
@ -235,7 +235,7 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
# 'shared' goes False -> True
if not is_shared_prev and is_shared_new:
self.attach_rbac(obj_id, self.obj_context.tenant_id)
self.attach_rbac(obj_id, self.obj_context.project_id)
return
# 'shared' goes True -> False is actually an attempt to delete
@ -265,7 +265,7 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
# NOTE(korzen) this case is used when object was
# instantiated and without DB interaction (get_object(s), update,
# create), it should be rare case to load 'shared' by that method
shared = self.get_shared_with_tenant(
shared = self.get_shared_with_project(
self.obj_context.elevated(),
self.rbac_db_cls,
self.id,
@ -305,9 +305,8 @@ def _create_hook(self, orig_create):
def _to_dict_hook(self, to_dict_orig):
dct = to_dict_orig(self)
if self.obj_context:
dct['shared'] = self.is_shared_with_tenant(self.obj_context,
self.id,
self.obj_context.tenant_id)
dct['shared'] = self.is_shared_with_project(
self.obj_context, self.id, self.obj_context.project_id)
else:
# most OVO objects on an agent will not have a context set on the
# object because they will be generated from obj_from_primitive.

View File

@ -125,10 +125,10 @@ class SecurityGroup(rbac_db.NeutronRbacObject):
filter_normalized_cidr_from_rules(primitive['rules'])
@classmethod
def get_bound_tenant_ids(cls, context, obj_id):
def get_bound_project_ids(cls, context, obj_id):
port_objs = ports.Port.get_objects(context,
security_group_ids=[obj_id])
return {port.tenant_id for port in port_objs}
return {port.project_id for port in port_objs}
@base.NeutronObjectRegistry.register

View File

@ -273,10 +273,10 @@ class Subnet(base.NeutronDbObject):
# instantiated and without DB interaction (get_object(s), update,
# create), it should be rare case to load 'shared' by that method
shared = (rbac_db.RbacNeutronDbObjectMixin.
get_shared_with_tenant(self.obj_context.elevated(),
network.NetworkRBAC,
self.network_id,
self.project_id))
get_shared_with_project(self.obj_context.elevated(),
network.NetworkRBAC,
self.network_id,
self.project_id))
setattr(self, 'shared', shared)
self.obj_reset_changes(['shared'])

View File

@ -105,7 +105,7 @@ class SubnetPool(rbac_db.NeutronRbacObject):
self._attach_prefixes(fields['prefixes'])
@classmethod
def get_bound_tenant_ids(cls, context, obj_id):
def get_bound_project_ids(cls, context, obj_id):
sn_objs = subnet.Subnet.get_objects(context, subnetpool_id=obj_id)
return {snp.project_id for snp in sn_objs}

View File

@ -94,7 +94,7 @@ class SecurityGroupServerAPIShimTestCase(base.BaseTestCase):
self.rcache.record_resource_update(self.ctx, 'Port', p)
return p
@mock.patch.object(address_group.AddressGroup, 'is_shared_with_tenant',
@mock.patch.object(address_group.AddressGroup, 'is_shared_with_project',
return_value=False)
def _make_address_group_ovo(self, *args, **kwargs):
id = uuidutils.generate_uuid()
@ -116,7 +116,7 @@ class SecurityGroupServerAPIShimTestCase(base.BaseTestCase):
ag)
return ag
@mock.patch.object(securitygroup.SecurityGroup, 'is_shared_with_tenant',
@mock.patch.object(securitygroup.SecurityGroup, 'is_shared_with_project',
return_value=False)
def _make_security_group_ovo(self, *args, **kwargs):
attrs = {'id': uuidutils.generate_uuid(), 'revision_number': 1}

View File

@ -148,8 +148,8 @@ class QosPolicyObjectTestCase(test_base.BaseObjectIfaceTestCase):
self._test_class, self.context, id='fake_id')
def test_to_dict_makes_primitive_field_value(self):
# is_shared_with_tenant requires DB
with mock.patch.object(self._test_class, 'is_shared_with_tenant',
# is_shared_with_project requires DB
with mock.patch.object(self._test_class, 'is_shared_with_project',
return_value=False):
(super(QosPolicyObjectTestCase, self).
test_to_dict_makes_primitive_field_value())
@ -460,10 +460,10 @@ class QosPolicyDbObjectTestCase(test_base.BaseDbObjectTestCase,
policy_obj._reload_is_default()
self.assertTrue(policy_obj.is_default)
def test_get_bound_tenant_ids_returns_set_of_tenant_ids(self):
def test_get_bound_project_ids_returns_set_of_project_ids(self):
obj = self._create_test_policy()
obj.attach_port(self._port['id'])
ids = self._test_class.get_bound_tenant_ids(self.context, obj['id'])
ids = self._test_class.get_bound_project_ids(self.context, obj['id'])
self.assertEqual(ids.pop(), self._port.project_id)
self.assertEqual(len(ids), 0)

View File

@ -387,6 +387,8 @@ class FakeNeutronObjectWithProjectId(base.NeutronDbObject):
'field2': common_types.UUIDField(),
}
fields_no_update = ['id', 'project_id']
@base.NeutronObjectRegistry.register_if(False)
class FakeNeutronObject(base.NeutronObject):
@ -750,10 +752,10 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
if getattr(self._test_class.rbac_db_cls, 'db_model', None):
mock.patch.object(
rbac_db.RbacNeutronDbObjectMixin,
'is_shared_with_tenant', return_value=False).start()
'is_shared_with_project', return_value=False).start()
mock.patch.object(
rbac_db.RbacNeutronDbObjectMixin,
'get_shared_with_tenant', return_value=False).start()
'get_shared_with_project', return_value=False).start()
def fake_get_object(self, context, model, **kwargs):
objs = self.model_map[model]
@ -1189,6 +1191,7 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
dict_ = obj.to_dict()
self.assertNotIn('project_id', dict_)
# TODO(ralonsoh): remove once bp/keystone-v3 migration finishes.
self.assertNotIn('tenant_id', dict_)
def test_fields_no_update(self):
@ -1196,13 +1199,13 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
for field in self._test_class.fields_no_update:
self.assertTrue(hasattr(obj, field))
def test_get_tenant_id(self):
def test_get_project_id(self):
if not hasattr(self._test_class, 'project_id'):
self.skipTest(
'Test class %r has no project_id field' % self._test_class)
obj = self._test_class(self.context, **self.obj_fields[0])
project_id = self.obj_fields[0]['project_id']
self.assertEqual(project_id, obj.tenant_id)
self.assertEqual(project_id, obj.project_id)
# Adding delete_objects mock because some objects are using delete_objects
# while calling update(), Port for example
@ -1407,22 +1410,22 @@ class BaseObjectIfaceWithProjectIdTestCase(BaseObjectIfaceTestCase):
_test_class = FakeNeutronObjectWithProjectId
def test_update_fields_using_tenant_id(self):
def test_update_fields_using_project_id(self):
obj = self._test_class(self.context, **self.obj_fields[0])
obj.obj_reset_changes()
tenant_id = obj['tenant_id']
project_id = obj['project_id']
new_obj_fields = dict()
new_obj_fields['tenant_id'] = uuidutils.generate_uuid()
new_obj_fields['project_id'] = uuidutils.generate_uuid()
new_obj_fields['field2'] = uuidutils.generate_uuid()
obj.update_fields(new_obj_fields)
self.assertEqual(set(['field2']), obj.obj_what_changed())
self.assertEqual(tenant_id, obj.project_id)
self.assertEqual(project_id, obj.project_id)
def test_tenant_id_filter_added_when_project_id_present(self):
def test_project_id_filter_added_when_project_id_present(self):
self._test_class.get_objects(
self.context, tenant_id=self.obj_fields[0]['project_id'])
self.context, project_id=self.obj_fields[0]['project_id'])
class BaseDbObjectMultipleForeignKeysTestCase(_BaseObjectTestCase,

View File

@ -73,7 +73,7 @@ class FakeNeutronDbObject(rbac_db.NeutronRbacObject):
synthetic_fields = ['field2']
def get_bound_tenant_ids(cls, context, policy_id):
def get_bound_project_ids(cls, context, policy_id):
pass
@ -86,44 +86,46 @@ class RbacNeutronDbObjectTestCase(test_rbac.RBACBaseObjectIfaceTestCase,
FakeNeutronDbObject.update_post = mock.Mock()
@mock.patch.object(_test_class.rbac_db_cls, 'db_model')
def test_get_tenants_with_shared_access_to_db_obj_return_tenant_ids(
def test_get_projects_with_shared_access_to_db_obj_return_project_ids(
self, *mocks):
ctx = mock.Mock()
fake_ids = {'tenant_id_' + str(i) for i in range(10)}
fake_ids = {'project_id_' + str(i) for i in range(10)}
ctx.session.query.return_value.filter.return_value = [
(fake_id,) for fake_id in fake_ids]
ret_ids = self._test_class._get_tenants_with_shared_access_to_db_obj(
ret_ids = self._test_class._get_projects_with_shared_access_to_db_obj(
ctx, 'fake_db_obj_id')
self.assertEqual(fake_ids, ret_ids)
def test_is_accessible_for_admin(self):
ctx = mock.Mock(is_admin=True, tenant_id='we_dont_care')
ctx = mock.Mock(is_admin=True, project_id='we_dont_care')
self.assertTrue(self._test_class.is_accessible(ctx, None))
def test_is_accessible_for_db_object_owner(self):
ctx = mock.Mock(is_admin=False, tenant_id='db_object_owner')
db_obj = mock.Mock(tenant_id=ctx.tenant_id)
ctx = mock.Mock(is_admin=False, project_id='db_object_owner')
db_obj = mock.Mock(project_id=ctx.project_id)
self.assertTrue(self._test_class.is_accessible(ctx, db_obj))
@mock.patch.object(_test_class, 'is_shared_with_tenant', return_value=True)
def test_is_accessible_if_shared_with_tenant(self, mock_is_shared):
ctx = mock.Mock(is_admin=False, tenant_id='db_object_shareholder')
db_obj = mock.Mock(tenant_id='db_object_owner')
@mock.patch.object(_test_class, 'is_shared_with_project',
return_value=True)
def test_is_accessible_if_shared_with_project(self, mock_is_shared):
ctx = mock.Mock(is_admin=False, project_id='db_object_shareholder')
db_obj = mock.Mock(project_id='db_object_owner')
self.assertTrue(self._test_class.is_accessible(ctx, db_obj))
mock_is_shared.assert_called_once_with(
mock.ANY, db_obj.id, ctx.tenant_id)
mock.ANY, db_obj.id, ctx.project_id)
@mock.patch.object(_test_class, 'is_shared_with_tenant',
@mock.patch.object(_test_class, 'is_shared_with_project',
return_value=False)
def test_is_accessible_fails_for_unauthorized_tenant(self, mock_is_shared):
ctx = mock.Mock(is_admin=False, tenant_id='Billy_the_kid')
db_obj = mock.Mock(tenant_id='db_object_owner')
def test_is_accessible_fails_for_unauthorized_project(self,
mock_is_shared):
ctx = mock.Mock(is_admin=False, project_id='Billy_the_kid')
db_obj = mock.Mock(project_id='db_object_owner')
self.assertFalse(self._test_class.is_accessible(ctx, db_obj))
mock_is_shared.assert_called_once_with(
mock.ANY, db_obj.id, ctx.tenant_id)
mock.ANY, db_obj.id, ctx.project_id)
def _rbac_policy_generate_change_events(self, resource, trigger,
context, object_type, policy,
@ -151,10 +153,10 @@ class RbacNeutronDbObjectTestCase(test_rbac.RBACBaseObjectIfaceTestCase,
@mock.patch.object(_test_class, 'validate_rbac_policy_update')
@mock.patch.object(obj_db_api, 'get_object',
return_value={'tenant_id': 'tyrion_lannister'})
return_value={'project_id': 'tyrion_lannister'})
def test_validate_rbac_policy_change_allowed_for_admin_or_owner(
self, mock_get_object, mock_validate_update):
context = mock.Mock(is_admin=True, tenant_id='db_obj_owner_id')
context = mock.Mock(is_admin=True, project_id='db_obj_owner_id')
self._rbac_policy_generate_change_events(
resource=None, trigger='dummy_trigger', context=context,
object_type=self._test_class.rbac_db_cls.db_model.object_type,
@ -165,10 +167,10 @@ class RbacNeutronDbObjectTestCase(test_rbac.RBACBaseObjectIfaceTestCase,
@mock.patch.object(_test_class, 'validate_rbac_policy_update')
@mock.patch.object(obj_db_api, 'get_object',
return_value={'tenant_id': 'king_beyond_the_wall'})
return_value={'project_id': 'king_beyond_the_wall'})
def test_validate_rbac_policy_change_forbidden_for_outsiders(
self, mock_get_object, mock_validate_update):
context = mock.Mock(is_admin=False, tenant_id='db_obj_owner_id')
context = mock.Mock(is_admin=False, project_id='db_obj_owner_id')
self.assertRaises(
n_exc.InvalidInput,
self._rbac_policy_generate_change_events,
@ -200,20 +202,20 @@ class RbacNeutronDbObjectTestCase(test_rbac.RBACBaseObjectIfaceTestCase,
def test_validate_rbac_policy_delete_skips_db_object_owner(self,
mock_get_object):
policy = {'action': rbac_db_models.ACCESS_SHARED,
'target_tenant': 'fake_tenant_id',
'target_tenant': 'fake_project_id',
'object_id': 'fake_obj_id',
'tenant_id': 'fake_tenant_id'}
mock_get_object.return_value.tenant_id = policy['target_tenant']
'project_id': 'fake_project_id'}
mock_get_object.return_value.project_id = policy['target_tenant']
self._test_validate_rbac_policy_delete_handles_policy(policy)
@mock.patch.object(obj_db_api, 'get_object')
@mock.patch.object(_test_class, 'get_bound_tenant_ids',
return_value='tenant_id_shared_with')
def test_validate_rbac_policy_delete_fails_single_tenant_and_in_use(
self, get_bound_tenant_ids_mock, mock_get_object):
@mock.patch.object(_test_class, 'get_bound_project_ids',
return_value='project_id_shared_with')
def test_validate_rbac_policy_delete_fails_single_project_and_in_use(
self, get_bound_project_ids_mock, mock_get_object):
policy = {'action': rbac_db_models.ACCESS_SHARED,
'target_tenant': 'tenant_id_shared_with',
'tenant_id': 'object_owner_tenant_id',
'target_tenant': 'project_id_shared_with',
'project_id': 'object_owner_project_id',
'object_id': 'fake_obj_id'}
context = mock.Mock()
with mock.patch.object(
@ -235,16 +237,16 @@ class RbacNeutronDbObjectTestCase(test_rbac.RBACBaseObjectIfaceTestCase,
trigger='dummy_trigger',
payload=payload)
def test_validate_rbac_policy_delete_not_bound_tenant_success(self):
def test_validate_rbac_policy_delete_not_bound_project_success(self):
context = mock.Mock()
with mock.patch.object(
self._test_class, 'get_bound_tenant_ids',
self._test_class, 'get_bound_project_ids',
return_value={'fake_tid2', 'fake_tid3'}), \
mock.patch.object(self._test_class,
'_get_db_obj_rbac_entries') as get_rbac_entries_mock, \
mock.patch.object(
self._test_class,
'_get_tenants_with_shared_access_to_db_obj') as sh_tids:
'_get_projects_with_shared_access_to_db_obj') as sh_tids:
get_rbac_entries_mock.filter.return_value.count.return_value = 0
self._test_class._validate_rbac_policy_delete(
context=context,
@ -254,16 +256,16 @@ class RbacNeutronDbObjectTestCase(test_rbac.RBACBaseObjectIfaceTestCase,
@mock.patch.object(_test_class, '_get_db_obj_rbac_entries')
@mock.patch.object(_test_class,
'_get_tenants_with_shared_access_to_db_obj',
return_value=['some_other_tenant'])
@mock.patch.object(_test_class, 'get_bound_tenant_ids',
'_get_projects_with_shared_access_to_db_obj',
return_value=['some_other_project'])
@mock.patch.object(_test_class, 'get_bound_project_ids',
return_value={'fake_id1'})
def test_validate_rbac_policy_delete_fails_single_used_wildcarded(
self, get_bound_tenant_ids_mock, mock_tenants_with_shared_access,
self, get_bound_project_ids_mock, mock_projects_with_shared_access,
_get_db_obj_rbac_entries_mock):
policy = {'action': rbac_db_models.ACCESS_SHARED,
'target_tenant': '*',
'tenant_id': 'object_owner_tenant_id',
'project_id': 'object_owner_project_id',
'object_id': 'fake_obj_id'}
context = mock.Mock()
payload = events.DBEventPayload(
@ -310,7 +312,7 @@ class RbacNeutronDbObjectTestCase(test_rbac.RBACBaseObjectIfaceTestCase,
target_tenant='*', action=rbac_db_models.ACCESS_SHARED)
attach_rbac_mock.assert_called_with(
obj_id, test_neutron_obj.obj_context.tenant_id)
obj_id, test_neutron_obj.obj_context.project_id)
def test_shared_field_false_without_context(self):
test_neutron_obj = self._test_class()
@ -335,9 +337,9 @@ class RbacNeutronDbObjectTestCase(test_rbac.RBACBaseObjectIfaceTestCase,
@mock.patch.object(_test_class, 'create_rbac_policy')
def test_attach_rbac_returns_type(self, create_rbac_mock):
obj_id = 'fake_obj_id'
tenant_id = 'fake_tenant_id'
target_tenant = 'fake_target_tenant'
self._test_class(mock.Mock()).attach_rbac(obj_id, tenant_id,
project_id = 'fake_project_id'
target_tenant = 'fake_target_project'
self._test_class(mock.Mock()).attach_rbac(obj_id, project_id,
target_tenant)
rbac_pol = create_rbac_mock.call_args_list[0][0][1]['rbac_policy']
self.assertEqual(rbac_pol['object_id'], obj_id)

View File

@ -138,10 +138,10 @@ class SubnetObjectIfaceTestCase(obj_test_base.BaseObjectIfaceTestCase):
# which is not allowed in 'Iface' test classes.
mock.patch.object(
rbac_db.RbacNeutronDbObjectMixin,
'is_shared_with_tenant', return_value=False).start()
'is_shared_with_project', return_value=False).start()
mock.patch.object(
rbac_db.RbacNeutronDbObjectMixin,
'get_shared_with_tenant').start()
'get_shared_with_project').start()
class SubnetDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
@ -192,7 +192,7 @@ class SubnetDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
obj = self._make_object(subnet_data)
# check if shared will be load by 'obj_load_attr' and using extra query
# by RbacNeutronDbObjectMixin get_shared_with_tenant
# by RbacNeutronDbObjectMixin get_shared_with_project
self.assertTrue(obj.shared)
obj.create()
# here the shared should be load by is_network_shared

View File

@ -84,7 +84,7 @@ class SubnetPoolDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
@mock.patch.object(obj_db_api, 'get_object')
def test_rbac_policy_create_no_address_scope(self, mock_get_object,
mock_query_with_hooks):
context = mock.Mock(is_admin=False, tenant_id='db_obj_owner_id')
context = mock.Mock(is_admin=False, project_id='db_obj_owner_id')
payload = mock.Mock(
context=context, request_body=dict(object_id="fake_id")
)
@ -114,8 +114,8 @@ class SubnetPoolDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
@mock.patch.object(obj_db_api, 'get_object')
def test_rbac_policy_create_no_matching_policies(self, mock_get_object,
mock_query_with_hooks):
context = mock.Mock(is_admin=False, tenant_id='db_obj_owner_id')
fake_project_id = "fake_target_tenant_id"
context = mock.Mock(is_admin=False, project_id='db_obj_owner_id')
fake_project_id = "fake_target_project_id"
payload = mock.Mock(
context=context, request_body=dict(
object_id="fake_id",
@ -146,8 +146,8 @@ class SubnetPoolDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
@mock.patch.object(obj_db_api, 'get_object')
def test_rbac_policy_create_valid(self, mock_get_object,
mock_query_with_hooks):
context = mock.Mock(is_admin=False, tenant_id='db_obj_owner_id')
fake_project_id = "fake_target_tenant_id"
context = mock.Mock(is_admin=False, project_id='db_obj_owner_id')
fake_project_id = "fake_target_project_id"
payload = mock.Mock(
context=context, request_body=dict(
object_id="fake_id",