Set is_admin flag correctly in RequestContext
Currently is_admin flag is always False. As a result some of the admin operations are not working. For example, quotas-list is not listing all the user quotas. This change sets the flag correctly based on the roles assigned to the user and policies defined in policy.json. Change-Id: I01534ccf1cf1e635282db497e0e026bea19c3bd2 Closes-Bug: #1660843
This commit is contained in:
parent
9ab31e0d03
commit
54152970e1
@ -13,6 +13,8 @@
|
||||
from eventlet.green import threading
|
||||
from oslo_context import context
|
||||
|
||||
from magnum.common import policy
|
||||
|
||||
import magnum.conf
|
||||
|
||||
CONF = magnum.conf.CONF
|
||||
@ -25,7 +27,7 @@ class RequestContext(context.RequestContext):
|
||||
domain_name=None, user_name=None, user_id=None,
|
||||
user_domain_name=None, user_domain_id=None,
|
||||
project_name=None, project_id=None, roles=None,
|
||||
is_admin=False, read_only=False, show_deleted=False,
|
||||
is_admin=None, read_only=False, show_deleted=False,
|
||||
request_id=None, trust_id=None, auth_token_info=None,
|
||||
all_tenants=False, password=None, **kwargs):
|
||||
"""Stores several additional request parameters:
|
||||
@ -60,6 +62,10 @@ class RequestContext(context.RequestContext):
|
||||
self.trust_id = trust_id
|
||||
self.all_tenants = all_tenants
|
||||
self.password = password
|
||||
if is_admin is None:
|
||||
self.is_admin = policy.check_is_admin(self)
|
||||
else:
|
||||
self.is_admin = is_admin
|
||||
|
||||
def to_dict(self):
|
||||
value = super(RequestContext, self).to_dict()
|
||||
|
@ -67,13 +67,7 @@ class KeystoneClientV3(object):
|
||||
return session
|
||||
|
||||
def _get_auth(self):
|
||||
if self.context.is_admin:
|
||||
try:
|
||||
auth = ka_loading.load_auth_from_conf_options(
|
||||
CONF, ksconf.CFG_GROUP)
|
||||
except ka_exception.MissingRequiredOptions:
|
||||
auth = self._get_legacy_auth()
|
||||
elif self.context.auth_token_info:
|
||||
if self.context.auth_token_info:
|
||||
access_info = ka_access.create(body=self.context.auth_token_info,
|
||||
auth_token=self.context.auth_token)
|
||||
auth = ka_access_plugin.AccessInfoPlugin(access_info)
|
||||
@ -91,7 +85,12 @@ class KeystoneClientV3(object):
|
||||
}
|
||||
|
||||
auth = ka_v3.Password(**auth_info)
|
||||
|
||||
elif self.context.is_admin:
|
||||
try:
|
||||
auth = ka_loading.load_auth_from_conf_options(
|
||||
CONF, ksconf.CFG_GROUP)
|
||||
except ka_exception.MissingRequiredOptions:
|
||||
auth = self._get_legacy_auth()
|
||||
else:
|
||||
LOG.error(_LE('Keystone API connection failed: no password, '
|
||||
'trust_id or token found.'))
|
||||
|
@ -18,10 +18,10 @@
|
||||
import decorator
|
||||
from oslo_config import cfg
|
||||
from oslo_policy import policy
|
||||
from oslo_utils import importutils
|
||||
import pecan
|
||||
|
||||
from magnum.common import clients
|
||||
from magnum.common import context
|
||||
from magnum.common import exception
|
||||
|
||||
|
||||
@ -101,6 +101,7 @@ def enforce(context, rule=None, target=None,
|
||||
|
||||
def add_policy_attributes(target):
|
||||
"""Adds extra information for policy enforcement to raw target object"""
|
||||
context = importutils.import_module('magnum.common.context')
|
||||
admin_context = context.make_admin_context()
|
||||
admin_osc = clients.OpenStackClients(admin_context)
|
||||
trustee_domain_id = admin_osc.keystone().trustee_domain_id
|
||||
@ -108,6 +109,16 @@ def add_policy_attributes(target):
|
||||
return target
|
||||
|
||||
|
||||
def check_is_admin(context):
|
||||
"""Whether or not user is admin according to policy setting.
|
||||
|
||||
"""
|
||||
init()
|
||||
target = {}
|
||||
credentials = context.to_dict()
|
||||
return _ENFORCER.enforce('context_is_admin', target, credentials)
|
||||
|
||||
|
||||
def enforce_wsgi(api_name, act=None):
|
||||
"""This is a decorator to simplify wsgi action policy rule check.
|
||||
|
||||
|
@ -70,7 +70,8 @@ class TestCase(base.BaseTestCase):
|
||||
self.context = magnum_context.RequestContext(
|
||||
auth_token_info=token_info,
|
||||
project_id='fake_project',
|
||||
user_id='fake_user')
|
||||
user_id='fake_user',
|
||||
is_admin=False)
|
||||
|
||||
self.global_mocks = {}
|
||||
|
||||
@ -91,6 +92,8 @@ class TestCase(base.BaseTestCase):
|
||||
kwargs['project_id'] = 'fake_project'
|
||||
if not kwargs.get('user_id'):
|
||||
kwargs['user_id'] = 'fake_user'
|
||||
if not kwargs.get('is_admin'):
|
||||
kwargs['is_admin'] = False
|
||||
|
||||
context = magnum_context.RequestContext(*args, **kwargs)
|
||||
return magnum_context.RequestContext.from_dict(context.to_dict())
|
||||
|
@ -92,6 +92,7 @@ class ClientsTest(base.BaseTestCase):
|
||||
con = mock.MagicMock()
|
||||
con.auth_token = None
|
||||
con.auth_token_info = None
|
||||
con.trust_id = None
|
||||
auth_url = mock.PropertyMock(name="auth_url",
|
||||
return_value="keystone_url")
|
||||
type(con).auth_url = auth_url
|
||||
@ -149,6 +150,7 @@ class ClientsTest(base.BaseTestCase):
|
||||
con = mock.MagicMock()
|
||||
con.auth_token = None
|
||||
con.auth_token_info = None
|
||||
con.trust_id = None
|
||||
auth_url = mock.PropertyMock(name="auth_url",
|
||||
return_value="keystone_url")
|
||||
type(con).auth_url = auth_url
|
||||
@ -207,6 +209,7 @@ class ClientsTest(base.BaseTestCase):
|
||||
con = mock.MagicMock()
|
||||
con.auth_token = None
|
||||
con.auth_token_info = None
|
||||
con.trust_id = None
|
||||
auth_url = mock.PropertyMock(name="auth_url",
|
||||
return_value="keystone_url")
|
||||
type(con).auth_url = auth_url
|
||||
@ -268,6 +271,7 @@ class ClientsTest(base.BaseTestCase):
|
||||
con = mock.MagicMock()
|
||||
con.auth_token = None
|
||||
con.auth_token_info = None
|
||||
con.trust_id = None
|
||||
auth_url = mock.PropertyMock(name="auth_url",
|
||||
return_value="keystone_url")
|
||||
type(con).auth_url = auth_url
|
||||
@ -283,6 +287,7 @@ class ClientsTest(base.BaseTestCase):
|
||||
mock_auth.__get__ = mock.Mock(return_value="keystone_url")
|
||||
con = mock.MagicMock()
|
||||
con.auth_token = "3bcc3d3a03f44e3d8377f9247b0ad155"
|
||||
con.auth_token_info = "auth-token-info"
|
||||
con.auth_url = "keystone_url"
|
||||
mock_url.return_value = "url_from_keystone"
|
||||
obj = clients.OpenStackClients(con)
|
||||
@ -329,6 +334,7 @@ class ClientsTest(base.BaseTestCase):
|
||||
con = mock.MagicMock()
|
||||
con.auth_token = None
|
||||
con.auth_token_info = None
|
||||
con.trust_id = None
|
||||
auth_url = mock.PropertyMock(name="auth_url",
|
||||
return_value="keystone_url")
|
||||
type(con).auth_url = auth_url
|
||||
|
@ -70,6 +70,9 @@ class KeystoneClientTest(base.TestCase):
|
||||
|
||||
def test_client_with_password(self, mock_ks):
|
||||
self.ctx.is_admin = True
|
||||
self.ctx.auth_token_info = None
|
||||
self.ctx.auth_token = None
|
||||
self.ctx.trust_id = None
|
||||
ks_client = keystone.KeystoneClientV3(self.ctx)
|
||||
ks_client.client
|
||||
session = ks_client.session
|
||||
@ -81,6 +84,9 @@ class KeystoneClientTest(base.TestCase):
|
||||
@mock.patch('magnum.common.keystone.ka_v3')
|
||||
def test_client_with_password_legacy(self, mock_v3, mock_loading, mock_ks):
|
||||
self.ctx.is_admin = True
|
||||
self.ctx.auth_token_info = None
|
||||
self.ctx.auth_token = None
|
||||
self.ctx.trust_id = None
|
||||
mock_loading.load_auth_from_conf_options.side_effect = \
|
||||
ka_exception.MissingRequiredOptions(mock.MagicMock())
|
||||
ks_client = keystone.KeystoneClientV3(self.ctx)
|
||||
|
45
magnum/tests/unit/common/test_policy.py
Normal file
45
magnum/tests/unit/common/test_policy.py
Normal file
@ -0,0 +1,45 @@
|
||||
# Copyright 2017 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_policy import policy as oslo_policy
|
||||
|
||||
from magnum.common import context as magnum_context
|
||||
from magnum.common import policy
|
||||
|
||||
from magnum.tests import base
|
||||
|
||||
|
||||
class TestPolicy(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestPolicy, self).setUp()
|
||||
rules_dict = {"context_is_admin": "role:admin"}
|
||||
self.rules = oslo_policy.Rules.from_dict(rules_dict)
|
||||
|
||||
def test_check_is_admin_with_admin_context_succeeds(self):
|
||||
ctx = magnum_context.RequestContext(user='test-user',
|
||||
project_id='test-project-id',
|
||||
is_admin=True)
|
||||
# explicitly set admin role as this test checks for admin role
|
||||
# with the policy engine
|
||||
ctx.roles = ['admin']
|
||||
self.assertTrue(policy.check_is_admin(ctx))
|
||||
|
||||
def test_check_is_admin_with_user_context_fails(self):
|
||||
ctx = magnum_context.RequestContext(user='test-user',
|
||||
project_id='test-project-id')
|
||||
# there is no admin role set in the context, so check_is_admin
|
||||
# should return False
|
||||
self.assertFalse(policy.check_is_admin(ctx))
|
Loading…
Reference in New Issue
Block a user