ae5bad49cc
Related-Blueprint: neutron-lib Change-Id: Ia014468bd621c4ee6aea95bf19328c61070174c4
315 lines
14 KiB
Python
315 lines
14 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import mock
|
|
|
|
from neutron_lib import exceptions as n_exc
|
|
from oslo_versionedobjects import base as obj_base
|
|
from oslo_versionedobjects import fields as obj_fields
|
|
from six import add_metaclass
|
|
import sqlalchemy as sa
|
|
|
|
from neutron.callbacks import events
|
|
from neutron.db import model_base
|
|
from neutron.db import rbac_db_models
|
|
from neutron.extensions import rbac as ext_rbac
|
|
from neutron.objects import base
|
|
from neutron.objects.db import api as obj_db_api
|
|
from neutron.objects import rbac_db
|
|
from neutron.tests.unit.objects import test_base
|
|
from neutron.tests.unit import testlib_api
|
|
|
|
|
|
class FakeDbModel(object):
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
|
|
class FakeRbacModel(rbac_db_models.RBACColumns, model_base.BASEV2):
|
|
object_id = sa.Column(sa.String(36), nullable=False)
|
|
object_type = 'fake_rbac_object'
|
|
|
|
def get_valid_actions(self):
|
|
return (rbac_db_models.ACCESS_SHARED,)
|
|
|
|
|
|
@obj_base.VersionedObjectRegistry.register_if(False)
|
|
@add_metaclass(rbac_db.RbacNeutronMetaclass)
|
|
class FakeNeutronDbObject(base.NeutronDbObject):
|
|
# Version 1.0: Initial version
|
|
VERSION = '1.0'
|
|
|
|
rbac_db_model = FakeRbacModel
|
|
db_model = FakeDbModel
|
|
|
|
fields = {
|
|
'id': obj_fields.UUIDField(),
|
|
'field1': obj_fields.StringField(),
|
|
'field2': obj_fields.StringField(),
|
|
'shared': obj_fields.BooleanField(default=False),
|
|
}
|
|
|
|
fields_no_update = ['id']
|
|
|
|
synthetic_fields = ['field2']
|
|
|
|
def get_bound_tenant_ids(cls, context, policy_id):
|
|
pass
|
|
|
|
|
|
class RbacNeutronDbObjectTestCase(test_base.BaseObjectIfaceTestCase,
|
|
testlib_api.SqlTestCase):
|
|
_test_class = FakeNeutronDbObject
|
|
|
|
def setUp(self):
|
|
super(RbacNeutronDbObjectTestCase, self).setUp()
|
|
FakeNeutronDbObject.update_post = mock.Mock()
|
|
|
|
@mock.patch.object(_test_class, 'rbac_db_model')
|
|
def test_get_tenants_with_shared_access_to_db_obj_return_tenant_ids(
|
|
self, *mocks):
|
|
ctx = mock.Mock()
|
|
fake_ids = {'tenant_id_' + str(i) for i in range(10)}
|
|
ctx.session.query.return_value.filter.return_value = [
|
|
(fake_id,) for fake_id in fake_ids]
|
|
ret_ids = self._test_class._get_tenants_with_shared_access_to_db_obj(
|
|
ctx, 'fake_db_obj_id')
|
|
self.assertEqual(fake_ids, ret_ids)
|
|
|
|
def test_is_accessible_for_admin(self):
|
|
ctx = mock.Mock(is_admin=True, tenant_id='we_dont_care')
|
|
self.assertTrue(self._test_class.is_accessible(ctx, None))
|
|
|
|
def test_is_accessible_for_db_object_owner(self):
|
|
ctx = mock.Mock(is_admin=False, tenant_id='db_object_owner')
|
|
db_obj = mock.Mock(tenant_id=ctx.tenant_id)
|
|
|
|
self.assertTrue(self._test_class.is_accessible(ctx, db_obj))
|
|
|
|
@mock.patch.object(_test_class, 'is_shared_with_tenant', return_value=True)
|
|
def test_is_accessible_if_shared_with_tenant(self, mock_is_shared):
|
|
ctx = mock.Mock(is_admin=False, tenant_id='db_object_shareholder')
|
|
db_obj = mock.Mock(tenant_id='db_object_owner')
|
|
|
|
self.assertTrue(self._test_class.is_accessible(ctx, db_obj))
|
|
mock_is_shared.assert_called_once_with(
|
|
mock.ANY, db_obj.id, ctx.tenant_id)
|
|
|
|
@mock.patch.object(_test_class, 'is_shared_with_tenant',
|
|
return_value=False)
|
|
def test_is_accessible_fails_for_unauthorized_tenant(self, mock_is_shared):
|
|
ctx = mock.Mock(is_admin=False, tenant_id='Billy_the_kid')
|
|
db_obj = mock.Mock(tenant_id='db_object_owner')
|
|
|
|
self.assertFalse(self._test_class.is_accessible(ctx, db_obj))
|
|
mock_is_shared.assert_called_once_with(
|
|
mock.ANY, db_obj.id, ctx.tenant_id)
|
|
|
|
def _rbac_policy_generate_change_events(self, resource, trigger,
|
|
context, object_type, policy,
|
|
event_list):
|
|
for event in event_list:
|
|
self._test_class.validate_rbac_policy_change(
|
|
resource, event, trigger, context, object_type, policy)
|
|
|
|
@mock.patch.object(_test_class, 'validate_rbac_policy_update')
|
|
def test_validate_rbac_policy_change_handles_only_object_type(
|
|
self, mock_validate_rbac_update):
|
|
self._rbac_policy_generate_change_events(
|
|
resource=None, trigger='dummy_trigger', context=None,
|
|
object_type='dummy_object_type', policy=None,
|
|
event_list=(events.BEFORE_CREATE, events.BEFORE_UPDATE,
|
|
events.BEFORE_DELETE))
|
|
|
|
mock_validate_rbac_update.assert_not_called()
|
|
|
|
@mock.patch.object(_test_class, 'validate_rbac_policy_update')
|
|
@mock.patch.object(_test_class, 'get_object',
|
|
return_value={'tenant_id': 'tyrion_lannister'})
|
|
def test_validate_rbac_policy_change_allowed_for_admin_or_owner(
|
|
self, mock_get_object, mock_validate_update):
|
|
context = mock.Mock(is_admin=True, tenant_id='db_obj_owner_id')
|
|
self._rbac_policy_generate_change_events(
|
|
resource=None, trigger='dummy_trigger', context=context,
|
|
object_type=self._test_class.rbac_db_model.object_type,
|
|
policy={'object_id': 'fake_object_id'},
|
|
event_list=(events.BEFORE_CREATE, events.BEFORE_UPDATE))
|
|
|
|
self.assertTrue(self._test_class.validate_rbac_policy_update.called)
|
|
|
|
@mock.patch.object(_test_class, 'validate_rbac_policy_update')
|
|
@mock.patch.object(_test_class, 'get_object',
|
|
return_value={'tenant_id': 'king_beyond_the_wall'})
|
|
def test_validate_rbac_policy_change_forbidden_for_outsiders(
|
|
self, mock_get_object, mock_validate_update):
|
|
context = mock.Mock(is_admin=False, tenant_id='db_obj_owner_id')
|
|
self.assertRaises(
|
|
n_exc.InvalidInput,
|
|
self._rbac_policy_generate_change_events,
|
|
resource=mock.Mock(), trigger='dummy_trigger', context=context,
|
|
object_type=self._test_class.rbac_db_model.object_type,
|
|
policy={'object_id': 'fake_object_id'},
|
|
event_list=(events.BEFORE_CREATE, events.BEFORE_UPDATE))
|
|
self.assertFalse(mock_validate_update.called)
|
|
|
|
@mock.patch.object(_test_class, '_validate_rbac_policy_delete')
|
|
def _test_validate_rbac_policy_delete_handles_policy(
|
|
self, policy, mock_validate_delete):
|
|
self._test_class.validate_rbac_policy_delete(
|
|
resource=mock.Mock(), event=events.BEFORE_DELETE,
|
|
trigger='dummy_trigger', context=mock.Mock(),
|
|
object_type=self._test_class.rbac_db_model.object_type,
|
|
policy=policy)
|
|
mock_validate_delete.assert_not_called()
|
|
|
|
def test_validate_rbac_policy_delete_handles_shared_action(self):
|
|
self._test_validate_rbac_policy_delete_handles_policy(
|
|
{'action': 'unknown_action'})
|
|
|
|
@mock.patch.object(_test_class, 'get_object')
|
|
def test_validate_rbac_policy_delete_skips_db_object_owner(self,
|
|
mock_get_object):
|
|
policy = {'action': rbac_db_models.ACCESS_SHARED,
|
|
'target_tenant': 'fake_tenant_id',
|
|
'object_id': 'fake_obj_id',
|
|
'tenant_id': 'fake_tenant_id'}
|
|
mock_get_object.return_value.tenant_id = policy['target_tenant']
|
|
self._test_validate_rbac_policy_delete_handles_policy(policy)
|
|
|
|
@mock.patch.object(_test_class, 'get_object')
|
|
@mock.patch.object(_test_class, 'get_bound_tenant_ids',
|
|
return_value='tenant_id_shared_with')
|
|
def test_validate_rbac_policy_delete_fails_single_tenant_and_in_use(
|
|
self, get_bound_tenant_ids_mock, mock_get_object):
|
|
policy = {'action': rbac_db_models.ACCESS_SHARED,
|
|
'target_tenant': 'tenant_id_shared_with',
|
|
'tenant_id': 'object_owner_tenant_id',
|
|
'object_id': 'fake_obj_id'}
|
|
context = mock.Mock()
|
|
with mock.patch.object(
|
|
self._test_class,
|
|
'_get_db_obj_rbac_entries') as target_tenants_mock:
|
|
filter_mock = target_tenants_mock.return_value.filter
|
|
filter_mock.return_value.count.return_value = 0
|
|
self.assertRaises(
|
|
ext_rbac.RbacPolicyInUse,
|
|
self._test_class.validate_rbac_policy_delete,
|
|
resource=None,
|
|
event=events.BEFORE_DELETE,
|
|
trigger='dummy_trigger',
|
|
context=context,
|
|
object_type=self._test_class.rbac_db_model.object_type,
|
|
policy=policy)
|
|
|
|
def test_validate_rbac_policy_delete_not_bound_tenant_success(self):
|
|
context = mock.Mock()
|
|
with mock.patch.object(
|
|
self._test_class, 'get_bound_tenant_ids',
|
|
return_value={'fake_tid2', 'fake_tid3'}), \
|
|
mock.patch.object(self._test_class,
|
|
'_get_db_obj_rbac_entries') as get_rbac_entries_mock, \
|
|
mock.patch.object(
|
|
self._test_class,
|
|
'_get_tenants_with_shared_access_to_db_obj') as sh_tids:
|
|
get_rbac_entries_mock.filter.return_value.count.return_value = 0
|
|
self._test_class._validate_rbac_policy_delete(
|
|
context=context,
|
|
obj_id='fake_obj_id',
|
|
target_tenant='fake_tid1')
|
|
sh_tids.assert_not_called()
|
|
|
|
@mock.patch.object(_test_class, '_get_db_obj_rbac_entries')
|
|
@mock.patch.object(_test_class,
|
|
'_get_tenants_with_shared_access_to_db_obj',
|
|
return_value=['some_other_tenant'])
|
|
@mock.patch.object(_test_class, 'get_bound_tenant_ids',
|
|
return_value={'fake_id1'})
|
|
def test_validate_rbac_policy_delete_fails_single_used_wildcarded(
|
|
self, get_bound_tenant_ids_mock, mock_tenants_with_shared_access,
|
|
_get_db_obj_rbac_entries_mock):
|
|
policy = {'action': rbac_db_models.ACCESS_SHARED,
|
|
'target_tenant': '*',
|
|
'tenant_id': 'object_owner_tenant_id',
|
|
'object_id': 'fake_obj_id'}
|
|
context = mock.Mock()
|
|
with mock.patch.object(self._test_class, 'get_object'):
|
|
self.assertRaises(
|
|
ext_rbac.RbacPolicyInUse,
|
|
self._test_class.validate_rbac_policy_delete,
|
|
resource=mock.Mock(),
|
|
event=events.BEFORE_DELETE,
|
|
trigger='dummy_trigger',
|
|
context=context,
|
|
object_type=self._test_class.rbac_db_model.object_type,
|
|
policy=policy)
|
|
|
|
@mock.patch.object(_test_class, 'attach_rbac')
|
|
@mock.patch.object(obj_db_api, 'get_object',
|
|
return_value=['fake_rbac_policy'])
|
|
@mock.patch.object(_test_class, '_validate_rbac_policy_delete')
|
|
def test_update_shared_avoid_duplicate_update(
|
|
self, mock_validate_delete, get_object_mock, attach_rbac_mock):
|
|
obj_id = 'fake_obj_id'
|
|
self._test_class(mock.Mock()).update_shared(is_shared_new=True,
|
|
obj_id=obj_id)
|
|
get_object_mock.assert_called_with(
|
|
mock.ANY, self._test_class.rbac_db_model, object_id=obj_id,
|
|
target_tenant='*', action=rbac_db_models.ACCESS_SHARED)
|
|
self.assertFalse(mock_validate_delete.called)
|
|
self.assertFalse(attach_rbac_mock.called)
|
|
|
|
@mock.patch.object(_test_class, 'attach_rbac')
|
|
@mock.patch.object(obj_db_api, 'get_object', return_value=[])
|
|
@mock.patch.object(_test_class, '_validate_rbac_policy_delete')
|
|
def test_update_shared_wildcard(
|
|
self, mock_validate_delete, get_object_mock, attach_rbac_mock):
|
|
obj_id = 'fake_obj_id'
|
|
|
|
test_neutron_obj = self._test_class(mock.Mock())
|
|
test_neutron_obj.update_shared(is_shared_new=True, obj_id=obj_id)
|
|
get_object_mock.assert_called_with(
|
|
mock.ANY, self._test_class.rbac_db_model, object_id=obj_id,
|
|
target_tenant='*', action=rbac_db_models.ACCESS_SHARED)
|
|
|
|
attach_rbac_mock.assert_called_with(
|
|
obj_id, test_neutron_obj._context.tenant_id)
|
|
|
|
@mock.patch.object(_test_class, 'attach_rbac')
|
|
@mock.patch.object(obj_db_api, 'get_object',
|
|
return_value=['fake_rbac_policy'])
|
|
@mock.patch.object(_test_class, '_validate_rbac_policy_delete')
|
|
def test_update_shared_remove_wildcard_sharing(
|
|
self, mock_validate_delete, get_object_mock, attach_rbac_mock):
|
|
obj_id = 'fake_obj_id'
|
|
self._test_class(mock.Mock()).update_shared(is_shared_new=False,
|
|
obj_id=obj_id)
|
|
get_object_mock.assert_called_with(
|
|
mock.ANY, self._test_class.rbac_db_model, object_id=obj_id,
|
|
target_tenant='*', action=rbac_db_models.ACCESS_SHARED)
|
|
|
|
self.assertFalse(attach_rbac_mock.attach_rbac.called)
|
|
mock_validate_delete.assert_called_with(mock.ANY, obj_id, '*')
|
|
|
|
@mock.patch.object(_test_class, 'create_rbac_policy')
|
|
def test_attach_rbac_returns_type(self, create_rbac_mock):
|
|
obj_id = 'fake_obj_id'
|
|
tenant_id = 'fake_tenant_id'
|
|
target_tenant = 'fake_target_tenant'
|
|
self._test_class(mock.Mock()).attach_rbac(obj_id, tenant_id,
|
|
target_tenant)
|
|
rbac_pol = create_rbac_mock.call_args_list[0][0][1]['rbac_policy']
|
|
self.assertEqual(rbac_pol['object_id'], obj_id)
|
|
self.assertEqual(rbac_pol['target_tenant'], target_tenant)
|
|
self.assertEqual(rbac_pol['action'], rbac_db_models.ACCESS_SHARED)
|
|
self.assertEqual(rbac_pol['object_type'],
|
|
self._test_class.rbac_db_model.object_type)
|