Merge "Move list_trusts enforcement to default policies"

This commit is contained in:
Zuul 2019-09-08 05:10:54 +00:00 committed by Gerrit Code Review
commit cdf27685b7
7 changed files with 232 additions and 24 deletions

View File

@ -138,6 +138,8 @@ identity:revocation_list GET /v3/auth/tokens/O
identity:revoke_token DELETE /v3/auth/tokens
identity:create_trust POST /v3/OS-TRUST/trusts
identity:list_trusts GET /v3/OS-TRUST/trusts
identity:list_trusts_for_trustor GET /v3/OS-TRUST/trusts?trustor_user_id={trustor_user_id}
identity:list_trusts_for_trustee GET /v3/OS-TRUST/trusts?trustee_user_id={trustee_user_id}
identity:list_roles_for_trust GET /v3/OS-TRUST/trusts/{trust_id}/roles
identity:get_role_for_trust GET /v3/OS-TRUST/trusts/{trust_id}/roles/{role_id}
identity:delete_trust DELETE /v3/OS-TRUST/trusts/{trust_id}

View File

@ -17,6 +17,8 @@
import flask
import flask_restful
from oslo_log import log
from oslo_policy import _checks as op_checks
from six.moves import http_client
from keystone.api._shared import json_home_relations
@ -24,6 +26,7 @@ from keystone.common import context
from keystone.common import json_home
from keystone.common import provider_api
from keystone.common import rbac_enforcer
from keystone.common.rbac_enforcer import policy
from keystone.common import utils
from keystone.common import validation
from keystone import exception
@ -32,6 +35,7 @@ from keystone.server import flask as ks_flask
from keystone.trust import schema
LOG = log.getLogger(__name__)
ENFORCER = rbac_enforcer.RBACEnforcer
PROVIDERS = provider_api.ProviderAPIs
@ -156,32 +160,49 @@ class TrustResource(ks_flask.ResourceBase):
return self.wrap_member(trust)
def _list_trusts(self):
ENFORCER.enforce_call(action='identity:list_trusts')
trusts = []
trustor_user_id = flask.request.args.get('trustor_user_id')
trustee_user_id = flask.request.args.get('trustee_user_id')
if trustor_user_id:
target = {'trust': {'trustor_user_id': trustor_user_id}}
ENFORCER.enforce_call(action='identity:list_trusts_for_trustor',
target_attr=target)
elif trustee_user_id:
target = {'trust': {'trustee_user_id': trustee_user_id}}
ENFORCER.enforce_call(action='identity:list_trusts_for_trustee',
target_attr=target)
else:
ENFORCER.enforce_call(action='identity:list_trusts')
trusts = []
# NOTE(cmurphy) As of Train, the default policies enforce the
# identity:list_trusts rule and there are new policies in-code to
# enforce identity:list_trusts_for_trustor and
# identity:list_trusts_for_trustee. However, in case the
# identity:list_trusts rule has been locally overridden by the default
# that would have been produced by the sample config, we need to
# enforce it again and warn that the behavior is changing.
rules = policy._ENFORCER._enforcer.rules.get('identity:list_trusts')
# rule check_str is ""
if isinstance(rules, op_checks.TrueCheck):
LOG.warning(
"The policy check string for rule \"identity:list_trusts\" has been overridden"
"to \"always true\". In the next release, this will cause the"
"\"identity:list_trusts\" action to be fully permissive as hardcoded"
"enforcement will be removed. To correct this issue, either stop overriding the"
"\"identity:list_trusts\" rule in config to accept the defaults, or explicitly"
"set a rule that is not empty."
)
if not flask.request.args:
# NOTE(morgan): Admin can list all trusts.
ENFORCER.enforce_call(action='admin_required')
if not flask.request.args:
# NOTE(morgan): Admin can list all trusts.
ENFORCER.enforce_call(action='admin_required')
trusts += PROVIDERS.trust_api.list_trusts()
# TODO(morgan): Convert the trustor/trustee checks into policy
# checkstr we can enforce on. This is duplication of code
# behavior/design as the OS-TRUST controller for ease of review/
# comparison of previous code. Minor optimizations [checks before db
# hits] have been done.
action = _('Cannot list trusts for another user')
if trustor_user_id:
if trustor_user_id != self.oslo_context.user_id:
raise exception.ForbiddenAction(action=action)
if trustee_user_id:
if trustee_user_id != self.oslo_context.user_id:
raise exception.ForbiddenAction(action=action)
trusts += PROVIDERS.trust_api.list_trusts_for_trustor(trustor_user_id)
trusts += PROVIDERS.trust_api.list_trusts_for_trustee(trustee_user_id)
elif trustor_user_id:
trusts += PROVIDERS.trust_api.list_trusts_for_trustor(trustor_user_id)
elif trustee_user_id:
trusts += PROVIDERS.trust_api.list_trusts_for_trustee(trustee_user_id)
for trust in trusts:
# get_trust returns roles, list_trusts does not

View File

@ -10,11 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import _checks
from oslo_policy import policy
from oslo_upgradecheck import upgradecheck
from keystone.common import rbac_enforcer
import keystone.conf
CONF = keystone.conf.CONF
ENFORCER = rbac_enforcer.RBACEnforcer
class Checks(upgradecheck.UpgradeCommands):
@ -26,8 +30,31 @@ class Checks(upgradecheck.UpgradeCommands):
policies actually exist within the roles backend.
"""
pass
def check_trust_policies_are_not_empty(self):
enforcer = policy.Enforcer(CONF)
ENFORCER.register_rules(enforcer)
enforcer.load_rules()
rule = enforcer.rules.get('identity:list_trusts')
if isinstance(rule, _checks.TrueCheck):
return upgradecheck.Result(
upgradecheck.Code.FAILURE,
"Policy check string for \"identity:list_trusts\" is "
"overridden to \"\", \"@\", or []. In the next release, "
"this will cause the \"identity:list_trusts\" action to be "
"fully permissive as hardcoded enforcement will be removed. "
"To correct this issue, either stop overriding this rule in "
"config to accept the defaults, or explicitly set a rule that "
"is not empty."
)
return upgradecheck.Result(
upgradecheck.Code.SUCCESS, "\"identity:list_trusts\" policy is safe")
_upgrade_checks = (
("Check trust policies are not empty",
check_trust_policies_are_not_empty),
)
def main():
keystone.conf.configure()
return upgradecheck.main(CONF, 'keystone', Checks())

View File

@ -14,6 +14,9 @@ from oslo_policy import policy
from keystone.common.policies import base
RULE_TRUSTOR = 'user_id:%(target.trust.trustor_user_id)s'
RULE_TRUSTEE = 'user_id:%(target.trust.trustee_user_id)s'
trust_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'create_trust',
@ -27,13 +30,31 @@ trust_policies = [
'method': 'POST'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_trusts',
check_str='',
check_str=base.RULE_ADMIN_REQUIRED,
scope_types=['project'],
description='List trusts.',
operations=[{'path': '/v3/OS-TRUST/trusts',
'method': 'GET'},
{'path': '/v3/OS-TRUST/trusts',
'method': 'HEAD'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_trusts_for_trustor',
check_str=RULE_TRUSTOR,
scope_types=['project'],
description='List trusts for trustor.',
operations=[{'path': '/v3/OS-TRUST/trusts?trustor_user_id={trustor_user_id}',
'method': 'GET'},
{'path': '/v3/OS-TRUST/trusts?trustor_user_id={trustor_user_id}',
'method': 'HEAD'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_trusts_for_trustee',
check_str=RULE_TRUSTEE,
scope_types=['project'],
description='List trusts for trustee.',
operations=[{'path': '/v3/OS-TRUST/trusts?trustee_user_id={trustee_user_id}',
'method': 'GET'},
{'path': '/v3/OS-TRUST/trusts?trustee_user_id={trustee_user_id}',
'method': 'HEAD'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_roles_for_trust',
check_str='',

View File

@ -12,6 +12,7 @@
import uuid
from oslo_serialization import jsonutils
from six.moves import http_client
from keystone.common import provider_api
@ -20,6 +21,7 @@ from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import base_classes
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import temporaryfile
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
@ -35,7 +37,13 @@ class TrustTests(base_classes.TestCaseWithBootstrap,
def setUp(self):
super(TrustTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
self.policy_file_name = self.policy_file.file_name
self.useFixture(
ksfixtures.Policy(
self.config_fixture, policy_file=self.policy_file_name
)
)
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
@ -92,6 +100,18 @@ class TrustTests(base_classes.TestCaseWithBootstrap,
self.token_id = r.headers['X-Subject-Token']
self.trustee_headers = {'X-Auth-Token': self.token_id}
def _override_policy_old_defaults(self):
# TODO(cmurphy): This is to simulate what would happen if the operator
# had generated a sample policy config, or had never removed their old
# policy files since we adopted policy in code, and had explicitly
# retained the old "" policy check strings. Remove this once the
# hardcoded enforcement is removed from the trusts API.
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:list_trusts': '',
}
f.write(jsonutils.dumps(overridden_policies))
class _AdminTestsMixin(object):
"""Tests for all admin users.
@ -226,6 +246,18 @@ class SystemAdminTests(TrustTests, _AdminTestsMixin):
expected_status_code=http_client.FORBIDDEN
)
def test_admin_list_all_trusts_overridden_defaults(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
r = c.get(
'/v3/OS-TRUST/trusts',
headers=self.headers
)
self.assertEqual(1, len(r.json['trusts']))
class ProjectUserTests(TrustTests):
"""Tests for all project users."""
@ -501,3 +533,67 @@ class ProjectUserTests(TrustTests):
headers=self.other_headers,
expected_status_code=http_client.FORBIDDEN
)
def test_trustor_cannot_list_trusts_for_trustee_overridden_default(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
c.get(
('/v3/OS-TRUST/trusts?trustee_user_id=%s' %
self.trustee_user_id),
headers=self.trustor_headers,
expected_status_code=http_client.FORBIDDEN
)
def test_trustee_cannot_list_trusts_for_trustor_overridden_default(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
c.get(
('/v3/OS-TRUST/trusts?trustor_user_id=%s' %
self.trustor_user_id),
headers=self.trustee_headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_list_trusts_for_other_trustor_overridden_default(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
c.get(
('/v3/OS-TRUST/trusts?trustor_user_id=%s' %
self.trustor_user_id),
headers=self.other_headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_list_trusts_for_other_trustee_overridden_default(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
c.get(
('/v3/OS-TRUST/trusts?trustee_user_id=%s' %
self.trustee_user_id),
headers=self.other_headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_list_all_trusts_overridden_default(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
c.get(
'/v3/OS-TRUST/trusts',
headers=self.trustee_headers,
expected_status_code=http_client.FORBIDDEN
)

View File

@ -26,6 +26,7 @@ import oslo_config.fixture
from oslo_db.sqlalchemy import migration
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_upgradecheck import upgradecheck
from six.moves import configparser
from six.moves import http_client
from six.moves import range
@ -41,6 +42,7 @@ from keystone.cmd.doctor import ldap
from keystone.cmd.doctor import security_compliance
from keystone.cmd.doctor import tokens
from keystone.cmd.doctor import tokens_fernet
from keystone.cmd import status
from keystone.common import provider_api
from keystone.common.sql import upgrades
import keystone.conf
@ -51,6 +53,7 @@ from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit.ksfixtures import ldapdb
from keystone.tests.unit.ksfixtures import policy
from keystone.tests.unit.ksfixtures import temporaryfile
from keystone.tests.unit import mapping_fixtures
@ -1843,3 +1846,39 @@ class TestMappingEngineTester(unit.BaseTestCase):
mapping_engine = cli.MappingEngineTester()
self.assertRaises(exception.ValidationError,
mapping_engine.main)
class CliStatusTestCase(unit.SQLDriverOverrides, unit.TestCase):
def setUp(self):
super(CliStatusTestCase, self).setUp()
self.load_backends()
self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
self.policy_file_name = self.policy_file.file_name
self.useFixture(
policy.Policy(
self.config_fixture, policy_file=self.policy_file_name
)
)
self.checks = status.Checks()
def test_check_safe_trust_policies(self):
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:list_trusts': ''
}
f.write(jsonutils.dumps(overridden_policies))
result = self.checks.check_trust_policies_are_not_empty()
self.assertEqual(upgradecheck.Code.FAILURE, result.code)
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:list_trusts': 'rule:admin_required'
}
f.write(jsonutils.dumps(overridden_policies))
result = self.checks.check_trust_policies_are_not_empty()
self.assertEqual(upgradecheck.Code.SUCCESS, result.code)
with open(self.policy_file_name, 'w') as f:
overridden_policies = {}
f.write(jsonutils.dumps(overridden_policies))
result = self.checks.check_trust_policies_are_not_empty()
self.assertEqual(upgradecheck.Code.SUCCESS, result.code)

View File

@ -194,6 +194,8 @@ class PolicyJsonTestCase(unit.TestCase):
'identity:revocation_list',
'identity:create_trust',
'identity:list_trusts',
'identity:list_trusts_for_trustor',
'identity:list_trusts_for_trustee',
'identity:list_roles_for_trust',
'identity:get_role_for_trust',
'identity:delete_trust',