Merge "Add Neutron context module and some policy methods"
This commit is contained in:
commit
d88406462e
142
neutron_lib/_context.py
Normal file
142
neutron_lib/_context.py
Normal file
@ -0,0 +1,142 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Context: context for security/db session."""
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
|
||||
from oslo_context import context as oslo_context
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
|
||||
# TODO(HenryG): replace db/_api.py with the real db/api.py
|
||||
from neutron_lib.db import _api as db_api
|
||||
from neutron_lib import policy
|
||||
|
||||
|
||||
class ContextBase(oslo_context.RequestContext):
|
||||
"""Security context and request information.
|
||||
|
||||
Represents the user taking a given action within the system.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, user_id, tenant_id, is_admin=None, roles=None,
|
||||
timestamp=None, request_id=None, tenant_name=None,
|
||||
user_name=None, overwrite=True, auth_token=None,
|
||||
is_advsvc=None, **kwargs):
|
||||
"""Object initialization.
|
||||
|
||||
:param overwrite: Set to False to ensure that the greenthread local
|
||||
copy of the index is not overwritten.
|
||||
|
||||
:param kwargs: Extra arguments that might be present, but we ignore
|
||||
because they possibly came in from older rpc messages.
|
||||
"""
|
||||
super(ContextBase, self).__init__(auth_token=auth_token,
|
||||
user=user_id, tenant=tenant_id,
|
||||
is_admin=is_admin,
|
||||
request_id=request_id,
|
||||
overwrite=overwrite,
|
||||
roles=roles)
|
||||
self.user_name = user_name
|
||||
self.tenant_name = tenant_name
|
||||
|
||||
if not timestamp:
|
||||
timestamp = datetime.datetime.utcnow()
|
||||
self.timestamp = timestamp
|
||||
self.is_advsvc = is_advsvc
|
||||
if self.is_advsvc is None:
|
||||
self.is_advsvc = self.is_admin or policy.check_is_advsvc(self)
|
||||
if self.is_admin is None:
|
||||
self.is_admin = policy.check_is_admin(self)
|
||||
|
||||
@property
|
||||
def project_id(self):
|
||||
return self.tenant
|
||||
|
||||
@property
|
||||
def tenant_id(self):
|
||||
return self.tenant
|
||||
|
||||
@tenant_id.setter
|
||||
def tenant_id(self, tenant_id):
|
||||
self.tenant = tenant_id
|
||||
|
||||
@property
|
||||
def user_id(self):
|
||||
return self.user
|
||||
|
||||
@user_id.setter
|
||||
def user_id(self, user_id):
|
||||
self.user = user_id
|
||||
|
||||
def to_dict(self):
|
||||
context = super(ContextBase, self).to_dict()
|
||||
context.update({
|
||||
'user_id': self.user_id,
|
||||
'tenant_id': self.tenant_id,
|
||||
'project_id': self.project_id,
|
||||
'timestamp': str(self.timestamp),
|
||||
'tenant_name': self.tenant_name,
|
||||
'project_name': self.tenant_name,
|
||||
'user_name': self.user_name,
|
||||
})
|
||||
return context
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, values):
|
||||
return cls(**values)
|
||||
|
||||
def elevated(self):
|
||||
"""Return a version of this context with admin flag set."""
|
||||
context = copy.copy(self)
|
||||
context.is_admin = True
|
||||
|
||||
if 'admin' not in [x.lower() for x in context.roles]:
|
||||
context.roles = context.roles + ["admin"]
|
||||
|
||||
return context
|
||||
|
||||
|
||||
@enginefacade.transaction_context_provider
|
||||
class ContextBaseWithSession(ContextBase):
|
||||
pass
|
||||
|
||||
|
||||
class Context(ContextBaseWithSession):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Context, self).__init__(*args, **kwargs)
|
||||
self._session = None
|
||||
|
||||
@property
|
||||
def session(self):
|
||||
# TODO(akamyshnikova): checking for session attribute won't be needed
|
||||
# when reader and writer will be used
|
||||
if hasattr(super(Context, self), 'session'):
|
||||
return super(Context, self).session
|
||||
if self._session is None:
|
||||
self._session = db_api.get_session()
|
||||
return self._session
|
||||
|
||||
|
||||
def get_admin_context():
|
||||
return Context(user_id=None,
|
||||
tenant_id=None,
|
||||
is_admin=True,
|
||||
overwrite=False)
|
||||
|
||||
|
||||
def get_admin_context_without_session():
|
||||
return ContextBase(user_id=None,
|
||||
tenant_id=None,
|
||||
is_admin=True)
|
40
neutron_lib/db/_api.py
Normal file
40
neutron_lib/db/_api.py
Normal file
@ -0,0 +1,40 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
TEMPORARY: use the old EngineFacade and lazy init.
|
||||
|
||||
TODO(HenryG): replace this file with the new db/api.py from neutron.
|
||||
"""
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_db.sqlalchemy import session
|
||||
|
||||
_FACADE = None
|
||||
|
||||
|
||||
def _create_facade_lazily():
|
||||
global _FACADE
|
||||
|
||||
# NOTE: This is going to change with bug 1520719
|
||||
if _FACADE is None:
|
||||
_FACADE = session.EngineFacade.from_config(cfg.CONF, sqlite_fk=True)
|
||||
|
||||
return _FACADE
|
||||
|
||||
|
||||
def get_session(autocommit=True, expire_on_commit=False, use_slave=False):
|
||||
"""Helper method to grab session."""
|
||||
facade = _create_facade_lazily()
|
||||
return facade.get_session(autocommit=autocommit,
|
||||
expire_on_commit=expire_on_commit,
|
||||
use_slave=use_slave)
|
@ -240,3 +240,11 @@ class NetworkTunnelRangeError(NeutronException):
|
||||
if isinstance(kwargs['tunnel_range'], tuple):
|
||||
kwargs['tunnel_range'] = "%d:%d" % kwargs['tunnel_range']
|
||||
super(NetworkTunnelRangeError, self).__init__(**kwargs)
|
||||
|
||||
|
||||
class PolicyInitError(NeutronException):
|
||||
message = _("Failed to initialize policy %(policy)s because %(reason)s.")
|
||||
|
||||
|
||||
class PolicyCheckError(NeutronException):
|
||||
message = _("Failed to check policy %(policy)s because %(reason)s.")
|
||||
|
61
neutron_lib/policy.py
Normal file
61
neutron_lib/policy.py
Normal file
@ -0,0 +1,61 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_policy import policy
|
||||
|
||||
|
||||
_ENFORCER = None
|
||||
_ADMIN_CTX_POLICY = 'context_is_admin'
|
||||
_ADVSVC_CTX_POLICY = 'context_is_advsvc'
|
||||
|
||||
|
||||
def reset():
|
||||
global _ENFORCER
|
||||
if _ENFORCER:
|
||||
_ENFORCER.clear()
|
||||
_ENFORCER = None
|
||||
|
||||
|
||||
def init(conf=cfg.CONF, policy_file=None):
|
||||
"""Init an instance of the Enforcer class."""
|
||||
|
||||
global _ENFORCER
|
||||
if not _ENFORCER:
|
||||
_ENFORCER = policy.Enforcer(conf, policy_file=policy_file)
|
||||
_ENFORCER.load_rules(True)
|
||||
|
||||
|
||||
def refresh(policy_file=None):
|
||||
"""Reset policy and init a new instance of Enforcer."""
|
||||
reset()
|
||||
init(policy_file=policy_file)
|
||||
|
||||
|
||||
def check_is_admin(context):
|
||||
"""Verify context has admin rights according to policy settings."""
|
||||
init()
|
||||
# the target is user-self
|
||||
credentials = context.to_dict()
|
||||
if _ADMIN_CTX_POLICY not in _ENFORCER.rules:
|
||||
return False
|
||||
return _ENFORCER.enforce(_ADMIN_CTX_POLICY, credentials, credentials)
|
||||
|
||||
|
||||
def check_is_advsvc(context):
|
||||
"""Verify context has advsvc rights according to policy settings."""
|
||||
init()
|
||||
# the target is user-self
|
||||
credentials = context.to_dict()
|
||||
if _ADVSVC_CTX_POLICY not in _ENFORCER.rules:
|
||||
return False
|
||||
return _ENFORCER.enforce(_ADVSVC_CTX_POLICY, credentials, credentials)
|
@ -23,11 +23,13 @@ import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_db import options as db_options
|
||||
from oslo_utils import strutils
|
||||
import pbr.version
|
||||
import six
|
||||
import testtools
|
||||
|
||||
from neutron_lib._i18n import _
|
||||
from neutron_lib import constants
|
||||
|
||||
from neutron_lib.tests import _post_mortem_debug as post_mortem_debug
|
||||
from neutron_lib.tests import _tools as tools
|
||||
|
||||
@ -99,6 +101,19 @@ class AttributeDict(dict):
|
||||
|
||||
class BaseTestCase(testtools.TestCase):
|
||||
|
||||
@staticmethod
|
||||
def config_parse(conf=None, args=None):
|
||||
"""Create the default configurations."""
|
||||
if args is None:
|
||||
args = []
|
||||
args += ['--config-file', etcdir('neutron_lib.conf')]
|
||||
if conf is None:
|
||||
version_info = pbr.version.VersionInfo('neutron-lib')
|
||||
cfg.CONF(args=args, project='neutron_lib',
|
||||
version='%%(prog)s %s' % version_info.release_string())
|
||||
else:
|
||||
conf(args)
|
||||
|
||||
def setUp(self):
|
||||
super(BaseTestCase, self).setUp()
|
||||
|
||||
@ -110,6 +125,12 @@ class BaseTestCase(testtools.TestCase):
|
||||
sqlite_db='', max_pool_size=10,
|
||||
max_overflow=20, pool_timeout=10)
|
||||
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'oslo_config.cfg.find_config_files',
|
||||
lambda project=None, prog=None, extension=None: []))
|
||||
|
||||
self.setup_config()
|
||||
|
||||
# Configure this first to ensure pm debugging support for setUp()
|
||||
debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
|
||||
if debugger:
|
||||
@ -195,3 +216,7 @@ class BaseTestCase(testtools.TestCase):
|
||||
self.assertEqual(v, actual_superset[k],
|
||||
"Key %(key)s expected: %(exp)r, actual %(act)r" %
|
||||
{'key': k, 'exp': v, 'act': actual_superset[k]})
|
||||
|
||||
def setup_config(self, args=None):
|
||||
"""Tests that need a non-default config can override this method."""
|
||||
self.config_parse(args=args)
|
||||
|
8
neutron_lib/tests/etc/neutron_lib.conf
Normal file
8
neutron_lib/tests/etc/neutron_lib.conf
Normal file
@ -0,0 +1,8 @@
|
||||
[DEFAULT]
|
||||
# Show debugging output in logs (sets DEBUG log level output)
|
||||
debug = False
|
||||
|
||||
lock_path = $state_path/lock
|
||||
|
||||
[database]
|
||||
connection = 'sqlite://'
|
2
neutron_lib/tests/etc/no_policy.json
Normal file
2
neutron_lib/tests/etc/no_policy.json
Normal file
@ -0,0 +1,2 @@
|
||||
{
|
||||
}
|
5
neutron_lib/tests/etc/policy.json
Normal file
5
neutron_lib/tests/etc/policy.json
Normal file
@ -0,0 +1,5 @@
|
||||
{
|
||||
"context_is_admin": "role:admin",
|
||||
"context_is_advsvc": "role:advsvc",
|
||||
"default": "rule:admin_or_owner"
|
||||
}
|
183
neutron_lib/tests/unit/test_context.py
Normal file
183
neutron_lib/tests/unit/test_context.py
Normal file
@ -0,0 +1,183 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from oslo_context import context as oslo_context
|
||||
from testtools import matchers
|
||||
|
||||
from neutron_lib import _context
|
||||
from neutron_lib.tests import _base
|
||||
|
||||
|
||||
class TestNeutronContext(_base.BaseTestCase):
|
||||
|
||||
def test_neutron_context_create(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id')
|
||||
self.assertEqual('user_id', ctx.user_id)
|
||||
self.assertEqual('tenant_id', ctx.project_id)
|
||||
self.assertEqual('tenant_id', ctx.tenant_id)
|
||||
request_id = ctx.request_id
|
||||
if isinstance(request_id, bytes):
|
||||
request_id = request_id.decode('utf-8')
|
||||
self.assertThat(request_id, matchers.StartsWith('req-'))
|
||||
self.assertEqual('user_id', ctx.user)
|
||||
self.assertEqual('tenant_id', ctx.tenant)
|
||||
self.assertIsNone(ctx.user_name)
|
||||
self.assertIsNone(ctx.tenant_name)
|
||||
self.assertIsNone(ctx.auth_token)
|
||||
|
||||
def test_neutron_context_getter_setter(self):
|
||||
ctx = _context.Context('Anakin', 'Skywalker')
|
||||
self.assertEqual('Anakin', ctx.user_id)
|
||||
self.assertEqual('Skywalker', ctx.tenant_id)
|
||||
ctx.user_id = 'Darth'
|
||||
ctx.tenant_id = 'Vader'
|
||||
self.assertEqual('Darth', ctx.user_id)
|
||||
self.assertEqual('Vader', ctx.tenant_id)
|
||||
|
||||
def test_neutron_context_create_with_name(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id',
|
||||
tenant_name='tenant_name',
|
||||
user_name='user_name')
|
||||
# Check name is set
|
||||
self.assertEqual('user_name', ctx.user_name)
|
||||
self.assertEqual('tenant_name', ctx.tenant_name)
|
||||
# Check user/tenant contains its ID even if user/tenant_name is passed
|
||||
self.assertEqual('user_id', ctx.user)
|
||||
self.assertEqual('tenant_id', ctx.tenant)
|
||||
|
||||
def test_neutron_context_create_with_request_id(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id', request_id='req_id_xxx')
|
||||
self.assertEqual('req_id_xxx', ctx.request_id)
|
||||
|
||||
def test_neutron_context_create_with_timestamp(self):
|
||||
now = "Right Now!"
|
||||
ctx = _context.Context('user_id', 'tenant_id', timestamp=now)
|
||||
self.assertEqual(now, ctx.timestamp)
|
||||
|
||||
def test_neutron_context_create_is_advsvc(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id', is_advsvc=True)
|
||||
self.assertFalse(ctx.is_admin)
|
||||
self.assertTrue(ctx.is_advsvc)
|
||||
|
||||
def test_neutron_context_create_with_auth_token(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id',
|
||||
auth_token='auth_token_xxx')
|
||||
self.assertEqual('auth_token_xxx', ctx.auth_token)
|
||||
|
||||
def test_neutron_context_from_dict(self):
|
||||
owner = {'user_id': 'Luke', 'tenant_id': 'Skywalker'}
|
||||
ctx = _context.Context.from_dict(owner)
|
||||
self.assertEqual(owner['user_id'], ctx.user_id)
|
||||
self.assertEqual(owner['tenant_id'], ctx.tenant_id)
|
||||
|
||||
def test_neutron_context_to_dict(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id')
|
||||
ctx_dict = ctx.to_dict()
|
||||
self.assertEqual('user_id', ctx_dict['user_id'])
|
||||
self.assertEqual('tenant_id', ctx_dict['project_id'])
|
||||
self.assertEqual(ctx.request_id, ctx_dict['request_id'])
|
||||
self.assertEqual('user_id', ctx_dict['user'])
|
||||
self.assertEqual('tenant_id', ctx_dict['tenant'])
|
||||
self.assertIsNone(ctx_dict['user_name'])
|
||||
self.assertIsNone(ctx_dict['tenant_name'])
|
||||
self.assertIsNone(ctx_dict['project_name'])
|
||||
self.assertIsNone(ctx_dict['auth_token'])
|
||||
|
||||
def test_neutron_context_to_dict_with_name(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id',
|
||||
tenant_name='tenant_name',
|
||||
user_name='user_name')
|
||||
ctx_dict = ctx.to_dict()
|
||||
self.assertEqual('user_name', ctx_dict['user_name'])
|
||||
self.assertEqual('tenant_name', ctx_dict['tenant_name'])
|
||||
self.assertEqual('tenant_name', ctx_dict['project_name'])
|
||||
|
||||
def test_neutron_context_to_dict_with_auth_token(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id',
|
||||
auth_token='auth_token_xxx')
|
||||
ctx_dict = ctx.to_dict()
|
||||
self.assertEqual('auth_token_xxx', ctx_dict['auth_token'])
|
||||
|
||||
def test_neutron_context_admin_to_dict(self):
|
||||
ctx = _context.get_admin_context()
|
||||
ctx_dict = ctx.to_dict()
|
||||
self.assertIsNone(ctx_dict['user_id'])
|
||||
self.assertIsNone(ctx_dict['tenant_id'])
|
||||
self.assertIsNone(ctx_dict['auth_token'])
|
||||
self.assertTrue(ctx_dict['is_admin'])
|
||||
self.assertIsNotNone(ctx.session)
|
||||
self.assertNotIn('session', ctx_dict)
|
||||
|
||||
def test_neutron_context_admin_without_session_to_dict(self):
|
||||
ctx = _context.get_admin_context_without_session()
|
||||
ctx_dict = ctx.to_dict()
|
||||
self.assertIsNone(ctx_dict['user_id'])
|
||||
self.assertIsNone(ctx_dict['tenant_id'])
|
||||
self.assertIsNone(ctx_dict['auth_token'])
|
||||
self.assertFalse(hasattr(ctx, 'session'))
|
||||
|
||||
def test_neutron_context_elevated_retains_request_id(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id')
|
||||
self.assertFalse(ctx.is_admin)
|
||||
req_id_before = ctx.request_id
|
||||
|
||||
elevated_ctx = ctx.elevated()
|
||||
self.assertTrue(elevated_ctx.is_admin)
|
||||
self.assertEqual(req_id_before, elevated_ctx.request_id)
|
||||
|
||||
def test_neutron_context_elevated_idempotent(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id')
|
||||
self.assertFalse(ctx.is_admin)
|
||||
elevated_ctx = ctx.elevated()
|
||||
self.assertTrue(elevated_ctx.is_admin)
|
||||
elevated2_ctx = elevated_ctx.elevated()
|
||||
self.assertTrue(elevated2_ctx.is_admin)
|
||||
|
||||
def test_neutron_context_overwrite(self):
|
||||
ctx1 = _context.Context('user_id', 'tenant_id')
|
||||
self.assertEqual(ctx1.request_id,
|
||||
oslo_context.get_current().request_id)
|
||||
|
||||
# If overwrite is not specified, request_id should be updated.
|
||||
ctx2 = _context.Context('user_id', 'tenant_id')
|
||||
self.assertNotEqual(ctx2.request_id, ctx1.request_id)
|
||||
self.assertEqual(ctx2.request_id,
|
||||
oslo_context.get_current().request_id)
|
||||
|
||||
# If overwrite is specified, request_id should be kept.
|
||||
ctx3 = _context.Context('user_id', 'tenant_id', overwrite=False)
|
||||
self.assertNotEqual(ctx3.request_id, ctx2.request_id)
|
||||
self.assertEqual(ctx2.request_id,
|
||||
oslo_context.get_current().request_id)
|
||||
|
||||
def test_neutron_context_get_admin_context_not_update_local_store(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id')
|
||||
req_id_before = oslo_context.get_current().request_id
|
||||
self.assertEqual(ctx.request_id, req_id_before)
|
||||
|
||||
ctx_admin = _context.get_admin_context()
|
||||
self.assertEqual(req_id_before, oslo_context.get_current().request_id)
|
||||
self.assertNotEqual(req_id_before, ctx_admin.request_id)
|
||||
|
||||
@mock.patch.object(_context.ContextBaseWithSession, 'session')
|
||||
def test_superclass_session(self, mocked_session):
|
||||
ctx = _context.Context('user_id', 'tenant_id')
|
||||
# make sure context uses parent class session that is mocked
|
||||
self.assertEqual(mocked_session, ctx.session)
|
||||
|
||||
def test_session_cached(self):
|
||||
ctx = _context.Context('user_id', 'tenant_id')
|
||||
session1 = ctx.session
|
||||
session2 = ctx.session
|
||||
self.assertIs(session1, session2)
|
68
neutron_lib/tests/unit/test_policy.py
Normal file
68
neutron_lib/tests/unit/test_policy.py
Normal file
@ -0,0 +1,68 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutron_lib import _context
|
||||
from neutron_lib import policy
|
||||
|
||||
from neutron_lib.tests import _base as base
|
||||
|
||||
|
||||
class TestPolicyEnforcer(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestPolicyEnforcer, self).setUp()
|
||||
# Isolate one _ENFORCER per test case
|
||||
mock.patch.object(policy, '_ENFORCER', None).start()
|
||||
|
||||
def test_init_reset_refresh(self):
|
||||
self.assertIsNone(policy._ENFORCER)
|
||||
policy.init()
|
||||
self.assertIsNotNone(policy._ENFORCER)
|
||||
policy.reset()
|
||||
self.assertIsNone(policy._ENFORCER)
|
||||
policy.refresh()
|
||||
self.assertIsNotNone(policy._ENFORCER)
|
||||
|
||||
def test_check_user_is_not_admin(self):
|
||||
ctx = _context.Context('me', 'my_project')
|
||||
self.assertFalse(policy.check_is_admin(ctx))
|
||||
|
||||
def test_check_user_elevated_is_admin(self):
|
||||
ctx = _context.Context('me', 'my_project', roles=['user']).elevated()
|
||||
self.assertTrue(policy.check_is_admin(ctx))
|
||||
|
||||
def test_check_is_admin_no_roles_no_admin(self):
|
||||
policy.init(policy_file='no_policy.json')
|
||||
ctx = _context.Context('me', 'my_project', roles=['user']).elevated()
|
||||
# With no admin role, elevated() should not work.
|
||||
self.assertFalse(policy.check_is_admin(ctx))
|
||||
|
||||
def test_check_is_advsvc_role(self):
|
||||
ctx = _context.Context('me', 'my_project', roles=['advsvc'])
|
||||
self.assertTrue(policy.check_is_advsvc(ctx))
|
||||
|
||||
def test_check_is_not_advsvc_user(self):
|
||||
ctx = _context.Context('me', 'my_project', roles=['user'])
|
||||
self.assertFalse(policy.check_is_advsvc(ctx))
|
||||
|
||||
def test_check_is_not_advsvc_admin(self):
|
||||
ctx = _context.Context('me', 'my_project').elevated()
|
||||
self.assertTrue(policy.check_is_admin(ctx))
|
||||
self.assertFalse(policy.check_is_advsvc(ctx))
|
||||
|
||||
def test_check_is_advsvc_no_roles_no_advsvc(self):
|
||||
policy.init(policy_file='no_policy.json')
|
||||
ctx = _context.Context('me', 'my_project', roles=['advsvc'])
|
||||
# No advsvc role in the policy file, so cannot assume the role.
|
||||
self.assertFalse(policy.check_is_advsvc(ctx))
|
@ -7,8 +7,11 @@ Babel>=2.3.4 # BSD
|
||||
|
||||
debtcollector>=1.2.0 # Apache-2.0
|
||||
oslo.config>=3.12.0 # Apache-2.0
|
||||
oslo.context>=2.4.0,!=2.6.0 # Apache-2.0
|
||||
oslo.db>=4.1.0 # Apache-2.0
|
||||
oslo.i18n>=2.1.0 # Apache-2.0
|
||||
oslo.log>=1.14.0 # Apache-2.0
|
||||
oslo.messaging>=5.2.0 # Apache-2.0
|
||||
oslo.policy>=1.9.0 # Apache-2.0
|
||||
oslo.service>=1.10.0 # Apache-2.0
|
||||
oslo.utils>=3.16.0 # Apache-2.0
|
||||
|
Loading…
Reference in New Issue
Block a user