Move list_trusts enforcement to default policies

Without this change, policy enforcement for the GET /OS-TRUST/trusts API
is hardcoded in the flask dispatcher code. This is a problem because
this enforcement can't be controlled by the operator, as is the norm.
Moreover, it makes the transition to system-scope and
default-roles-aware policies more difficult because there's no sensible
migration from "" to a logical role-based check string.

This patch starts the conversion from hardcoded enforcement to
enforcement via default policies for GET /OS-TRUST/trusts. To do this,
we add two new policy rules, "identity:list_trusts_for_trustor" and
"identity:list_trusts_for_trustee". We need to do this so that we can
keep backwards compatibility with the bizarre behavior that an admin can
list all trusts (GET /OS-TRUST/trusts) but not list trusts for a trustor
or trustee (GET /OS-TRUST/trusts?trustor_user_id={} and
GET/OS-TRUST/trusts?trustee_user_id={}). The tricky part is that it's
plausible that operators may have incorporated the hardcoded empty
default for "identity:list_trusts" into their on-disk policy
configuration, either by never removing the old default policy file that
used to come packaged with keystone, or by generating a sample file and
applying that to disk (we don't recommend that but we don't expressly
forbid or discourage it either). To overcome
this, the trust API code checks whether the "identity:list_trusts" rule
is "" and re-applies the enforcement with a warning. We don't need to do
this for the two new policies because they are initially enforced
in-code and an operator would have to take explicit action on upgrade to
override them.

This change does not use the formal oslo.policy deprecation system
because "" OR'd with the new default is entirely useless as a policy.

Partial-bug: #1818850
Partial-bug: #1818846

Change-Id: I6c1a4ecd756519f7f807c9d28960482e7f0d235b
This commit is contained in:
Colleen Murphy 2019-08-11 21:06:36 -07:00 committed by Colleen Murphy
parent 6a3648e782
commit 7717ed38d9
7 changed files with 232 additions and 24 deletions

View File

@ -138,6 +138,8 @@ identity:revocation_list GET /v3/auth/tokens/O
identity:revoke_token DELETE /v3/auth/tokens
identity:create_trust POST /v3/OS-TRUST/trusts
identity:list_trusts GET /v3/OS-TRUST/trusts
identity:list_trusts_for_trustor GET /v3/OS-TRUST/trusts?trustor_user_id={trustor_user_id}
identity:list_trusts_for_trustee GET /v3/OS-TRUST/trusts?trustee_user_id={trustee_user_id}
identity:list_roles_for_trust GET /v3/OS-TRUST/trusts/{trust_id}/roles
identity:get_role_for_trust GET /v3/OS-TRUST/trusts/{trust_id}/roles/{role_id}
identity:delete_trust DELETE /v3/OS-TRUST/trusts/{trust_id}

View File

@ -17,6 +17,8 @@
import flask
import flask_restful
from oslo_log import log
from oslo_policy import _checks as op_checks
from six.moves import http_client
from keystone.api._shared import json_home_relations
@ -24,6 +26,7 @@ from keystone.common import context
from keystone.common import json_home
from keystone.common import provider_api
from keystone.common import rbac_enforcer
from keystone.common.rbac_enforcer import policy
from keystone.common import utils
from keystone.common import validation
from keystone import exception
@ -32,6 +35,7 @@ from keystone.server import flask as ks_flask
from keystone.trust import schema
LOG = log.getLogger(__name__)
ENFORCER = rbac_enforcer.RBACEnforcer
PROVIDERS = provider_api.ProviderAPIs
@ -162,32 +166,49 @@ class TrustResource(ks_flask.ResourceBase):
return self.wrap_member(trust)
def _list_trusts(self):
ENFORCER.enforce_call(action='identity:list_trusts')
trusts = []
trustor_user_id = flask.request.args.get('trustor_user_id')
trustee_user_id = flask.request.args.get('trustee_user_id')
if not trustor_user_id and not trustee_user_id:
ENFORCER.enforce_call(action='identity:list_trusts')
elif trustor_user_id:
target = {'trust': {'trustor_user_id': trustor_user_id}}
ENFORCER.enforce_call(action='identity:list_trusts_for_trustor',
target_attr=target)
elif trustee_user_id:
target = {'trust': {'trustee_user_id': trustee_user_id}}
ENFORCER.enforce_call(action='identity:list_trusts_for_trustee',
target_attr=target)
trusts = []
# NOTE(cmurphy) As of Train, the default policies enforce the
# identity:list_trusts rule and there are new policies in-code to
# enforce identity:list_trusts_for_trustor and
# identity:list_trusts_for_trustee. However, in case the
# identity:list_trusts rule has been locally overridden by the default
# that would have been produced by the sample config, we need to
# enforce it again and warn that the behavior is changing.
rules = policy._ENFORCER._enforcer.rules.get('identity:list_trusts')
# rule check_str is ""
if isinstance(rules, op_checks.TrueCheck):
LOG.warning(
"The policy check string for rule \"identity:list_trusts\" has been overridden"
"to \"always true\". In the next release, this will cause the"
"\"identity:list_trusts\" action to be fully permissive as hardcoded"
"enforcement will be removed. To correct this issue, either stop overriding the"
"\"identity:list_trusts\" rule in config to accept the defaults, or explicitly"
"set a rule that is not empty."
)
if not flask.request.args:
# NOTE(morgan): Admin can list all trusts.
ENFORCER.enforce_call(action='admin_required')
if not flask.request.args:
# NOTE(morgan): Admin can list all trusts.
ENFORCER.enforce_call(action='admin_required')
trusts += PROVIDERS.trust_api.list_trusts()
# TODO(morgan): Convert the trustor/trustee checks into policy
# checkstr we can enforce on. This is duplication of code
# behavior/design as the OS-TRUST controller for ease of review/
# comparison of previous code. Minor optimizations [checks before db
# hits] have been done.
action = _('Cannot list trusts for another user')
if trustor_user_id:
if trustor_user_id != self.oslo_context.user_id:
raise exception.ForbiddenAction(action=action)
if trustee_user_id:
if trustee_user_id != self.oslo_context.user_id:
raise exception.ForbiddenAction(action=action)
trusts += PROVIDERS.trust_api.list_trusts_for_trustor(trustor_user_id)
trusts += PROVIDERS.trust_api.list_trusts_for_trustee(trustee_user_id)
elif trustor_user_id:
trusts += PROVIDERS.trust_api.list_trusts_for_trustor(trustor_user_id)
elif trustee_user_id:
trusts += PROVIDERS.trust_api.list_trusts_for_trustee(trustee_user_id)
for trust in trusts:
# get_trust returns roles, list_trusts does not

View File

@ -10,11 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import _checks
from oslo_policy import policy
from oslo_upgradecheck import upgradecheck
from keystone.common import rbac_enforcer
import keystone.conf
CONF = keystone.conf.CONF
ENFORCER = rbac_enforcer.RBACEnforcer
class Checks(upgradecheck.UpgradeCommands):
@ -26,8 +30,31 @@ class Checks(upgradecheck.UpgradeCommands):
policies actually exist within the roles backend.
"""
pass
def check_trust_policies_are_not_empty(self):
enforcer = policy.Enforcer(CONF)
ENFORCER.register_rules(enforcer)
enforcer.load_rules()
rule = enforcer.rules.get('identity:list_trusts')
if isinstance(rule, _checks.TrueCheck):
return upgradecheck.Result(
upgradecheck.Code.FAILURE,
"Policy check string for \"identity:list_trusts\" is "
"overridden to \"\", \"@\", or []. In the next release, "
"this will cause the \"identity:list_trusts\" action to be "
"fully permissive as hardcoded enforcement will be removed. "
"To correct this issue, either stop overriding this rule in "
"config to accept the defaults, or explicitly set a rule that "
"is not empty."
)
return upgradecheck.Result(
upgradecheck.Code.SUCCESS, "\"identity:list_trusts\" policy is safe")
_upgrade_checks = (
("Check trust policies are not empty",
check_trust_policies_are_not_empty),
)
def main():
keystone.conf.configure()
return upgradecheck.main(CONF, 'keystone', Checks())

View File

@ -14,6 +14,9 @@ from oslo_policy import policy
from keystone.common.policies import base
RULE_TRUSTOR = 'user_id:%(target.trust.trustor_user_id)s'
RULE_TRUSTEE = 'user_id:%(target.trust.trustee_user_id)s'
trust_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'create_trust',
@ -27,13 +30,31 @@ trust_policies = [
'method': 'POST'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_trusts',
check_str='',
check_str=base.RULE_ADMIN_REQUIRED,
scope_types=['project'],
description='List trusts.',
operations=[{'path': '/v3/OS-TRUST/trusts',
'method': 'GET'},
{'path': '/v3/OS-TRUST/trusts',
'method': 'HEAD'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_trusts_for_trustor',
check_str=RULE_TRUSTOR,
scope_types=['project'],
description='List trusts for trustor.',
operations=[{'path': '/v3/OS-TRUST/trusts?trustor_user_id={trustor_user_id}',
'method': 'GET'},
{'path': '/v3/OS-TRUST/trusts?trustor_user_id={trustor_user_id}',
'method': 'HEAD'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_trusts_for_trustee',
check_str=RULE_TRUSTEE,
scope_types=['project'],
description='List trusts for trustee.',
operations=[{'path': '/v3/OS-TRUST/trusts?trustee_user_id={trustee_user_id}',
'method': 'GET'},
{'path': '/v3/OS-TRUST/trusts?trustee_user_id={trustee_user_id}',
'method': 'HEAD'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_roles_for_trust',
check_str='',

View File

@ -12,6 +12,7 @@
import uuid
from oslo_serialization import jsonutils
from six.moves import http_client
from keystone.common import provider_api
@ -20,6 +21,7 @@ from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import base_classes
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import temporaryfile
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
@ -35,7 +37,13 @@ class TrustTests(base_classes.TestCaseWithBootstrap,
def setUp(self):
super(TrustTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
self.policy_file_name = self.policy_file.file_name
self.useFixture(
ksfixtures.Policy(
self.config_fixture, policy_file=self.policy_file_name
)
)
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
@ -92,6 +100,18 @@ class TrustTests(base_classes.TestCaseWithBootstrap,
self.token_id = r.headers['X-Subject-Token']
self.trustee_headers = {'X-Auth-Token': self.token_id}
def _override_policy_old_defaults(self):
# TODO(cmurphy): This is to simulate what would happen if the operator
# had generated a sample policy config, or had never removed their old
# policy files since we adopted policy in code, and had explicitly
# retained the old "" policy check strings. Remove this once the
# hardcoded enforcement is removed from the trusts API.
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:list_trusts': '',
}
f.write(jsonutils.dumps(overridden_policies))
class _AdminTestsMixin(object):
"""Tests for all admin users.
@ -226,6 +246,18 @@ class SystemAdminTests(TrustTests, _AdminTestsMixin):
expected_status_code=http_client.FORBIDDEN
)
def test_admin_list_all_trusts_overridden_defaults(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
r = c.get(
'/v3/OS-TRUST/trusts',
headers=self.headers
)
self.assertEqual(1, len(r.json['trusts']))
class ProjectUserTests(TrustTests):
"""Tests for all project users."""
@ -501,3 +533,67 @@ class ProjectUserTests(TrustTests):
headers=self.other_headers,
expected_status_code=http_client.FORBIDDEN
)
def test_trustor_cannot_list_trusts_for_trustee_overridden_default(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
c.get(
('/v3/OS-TRUST/trusts?trustee_user_id=%s' %
self.trustee_user_id),
headers=self.trustor_headers,
expected_status_code=http_client.FORBIDDEN
)
def test_trustee_cannot_list_trusts_for_trustor_overridden_default(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
c.get(
('/v3/OS-TRUST/trusts?trustor_user_id=%s' %
self.trustor_user_id),
headers=self.trustee_headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_list_trusts_for_other_trustor_overridden_default(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
c.get(
('/v3/OS-TRUST/trusts?trustor_user_id=%s' %
self.trustor_user_id),
headers=self.other_headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_list_trusts_for_other_trustee_overridden_default(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
c.get(
('/v3/OS-TRUST/trusts?trustee_user_id=%s' %
self.trustee_user_id),
headers=self.other_headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_list_all_trusts_overridden_default(self):
self._override_policy_old_defaults()
PROVIDERS.trust_api.create_trust(
self.trust_id, **self.trust_data)
with self.test_client() as c:
c.get(
'/v3/OS-TRUST/trusts',
headers=self.trustee_headers,
expected_status_code=http_client.FORBIDDEN
)

View File

@ -26,6 +26,7 @@ import oslo_config.fixture
from oslo_db.sqlalchemy import migration
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_upgradecheck import upgradecheck
from six.moves import configparser
from six.moves import http_client
from six.moves import range
@ -41,6 +42,7 @@ from keystone.cmd.doctor import ldap
from keystone.cmd.doctor import security_compliance
from keystone.cmd.doctor import tokens
from keystone.cmd.doctor import tokens_fernet
from keystone.cmd import status
from keystone.common import provider_api
from keystone.common.sql import upgrades
import keystone.conf
@ -51,6 +53,7 @@ from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit.ksfixtures import ldapdb
from keystone.tests.unit.ksfixtures import policy
from keystone.tests.unit.ksfixtures import temporaryfile
from keystone.tests.unit import mapping_fixtures
@ -1843,3 +1846,39 @@ class TestMappingEngineTester(unit.BaseTestCase):
mapping_engine = cli.MappingEngineTester()
self.assertRaises(exception.ValidationError,
mapping_engine.main)
class CliStatusTestCase(unit.SQLDriverOverrides, unit.TestCase):
def setUp(self):
super(CliStatusTestCase, self).setUp()
self.load_backends()
self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
self.policy_file_name = self.policy_file.file_name
self.useFixture(
policy.Policy(
self.config_fixture, policy_file=self.policy_file_name
)
)
self.checks = status.Checks()
def test_check_safe_trust_policies(self):
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:list_trusts': ''
}
f.write(jsonutils.dumps(overridden_policies))
result = self.checks.check_trust_policies_are_not_empty()
self.assertEqual(upgradecheck.Code.FAILURE, result.code)
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:list_trusts': 'rule:admin_required'
}
f.write(jsonutils.dumps(overridden_policies))
result = self.checks.check_trust_policies_are_not_empty()
self.assertEqual(upgradecheck.Code.SUCCESS, result.code)
with open(self.policy_file_name, 'w') as f:
overridden_policies = {}
f.write(jsonutils.dumps(overridden_policies))
result = self.checks.check_trust_policies_are_not_empty()
self.assertEqual(upgradecheck.Code.SUCCESS, result.code)

View File

@ -194,6 +194,8 @@ class PolicyJsonTestCase(unit.TestCase):
'identity:revocation_list',
'identity:create_trust',
'identity:list_trusts',
'identity:list_trusts_for_trustor',
'identity:list_trusts_for_trustee',
'identity:list_roles_for_trust',
'identity:get_role_for_trust',
'identity:delete_trust',