fix self-service credential APIs

Self-service credential APIs are broken in stable/rocky because
it no longer building the target attributes with the entity
data. Therefore, only "admin" can perform credential optionations.
The lack of self-service capability limits the usefulness of
these APIs. Arguably this is break backward compatibility as
self-service used to work in stable/queens and older releases.
Though we've never built the properly tests to guard this
functionality.

This patch re-enables self-service capability by conveying the
entity data via the target attributes. It also build the proper
tests for it.

Change-Id: Ic7dddc4d2fe7b6c6ae3bf6aed6c4c048743b3eed
Closes-Bug: 1815539
This commit is contained in:
Guang Yee 2019-02-15 17:14:18 -08:00
parent a2e307ed4d
commit 4420b78c01
2 changed files with 118 additions and 3 deletions

View File

@ -31,6 +31,20 @@ PROVIDERS = provider_api.ProviderAPIs
ENFORCER = rbac_enforcer.RBACEnforcer
def _build_target_enforcement():
target = {}
try:
target['credential'] = PROVIDERS.credential_api.get_credential(
flask.request.view_args.get('credential_id')
)
except exception.NotFound: # nosec
# Defer existance in the event the credential doesn't exist, we'll
# check this later anyway.
pass
return target
class CredentialResource(ks_flask.ResourceBase):
collection_key = 'credentials'
member_key = 'credential'
@ -83,7 +97,10 @@ class CredentialResource(ks_flask.ResourceBase):
return self.wrap_collection(refs, hints=hints)
def _get_credential(self, credential_id):
ENFORCER.enforce_call(action='identity:get_credential')
ENFORCER.enforce_call(
action='identity:get_credential',
target_attr=_build_target_enforcement()
)
ref = PROVIDERS.credential_api.get_credential(credential_id)
return self.wrap_member(self._blob_to_json(ref))
@ -108,7 +125,10 @@ class CredentialResource(ks_flask.ResourceBase):
def patch(self, credential_id):
# Update Credential
ENFORCER.enforce_call(action='identity:update_credential')
ENFORCER.enforce_call(
action='identity:update_credential',
target_attr=_build_target_enforcement()
)
credential = flask.request.json.get('credential', {})
validation.lazy_validate(schema.credential_update, credential)
self._require_matching_id(credential)
@ -118,7 +138,11 @@ class CredentialResource(ks_flask.ResourceBase):
def delete(self, credential_id):
# Delete credentials
ENFORCER.enforce_call(action='identity:delete_credential')
ENFORCER.enforce_call(
action='identity:delete_credential',
target_attr=_build_target_enforcement()
)
return (PROVIDERS.credential_api.delete_credential(credential_id),
http_client.NO_CONTENT)

View File

@ -17,6 +17,7 @@ import json
import uuid
from keystoneclient.contrib.ec2 import utils as ec2_utils
from oslo_serialization import jsonutils
from six.moves import http_client
from testtools import matchers
@ -27,6 +28,7 @@ from keystone.credential.providers import fernet as credential_fernet
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import temporaryfile
from keystone.tests.unit import test_v3
@ -347,6 +349,95 @@ class CredentialTestCase(CredentialBaseTestCase):
self.assertValidCredentialResponse(r, ref)
class CredentialSelfServiceTestCase(CredentialBaseTestCase):
"""Test self-service credential CRUD."""
def _policy_fixture(self):
return ksfixtures.Policy(self.tmpfilename, self.config_fixture)
def _set_policy(self, new_policy):
with open(self.tmpfilename, "w") as policyfile:
policyfile.write(jsonutils.dumps(new_policy))
def setUp(self):
self.tempfile = self.useFixture(temporaryfile.SecureTempFile())
self.tmpfilename = self.tempfile.file_name
super(CredentialSelfServiceTestCase, self).setUp()
# set the self-service credential policies
self_service_credential_policies = {
"identity:create_credential": "user_id:%(credential.user_id)s",
"identity:list_credentials": "user_id:%(user_id)s",
"identity:get_credential": "user_id:%(target.credential.user_id)s",
"identity:update_credential":
"user_id:%(target.credential.user_id)s",
"identity:delete_credential":
"user_id:%(target.credential.user_id)s"
}
self._set_policy(self_service_credential_policies)
# remove the 'admin' role from user and replace it with an
# arbitrary role
PROVIDERS.assignment_api.remove_role_from_user_and_project(
self.user_id, self.project_id, self.role_id)
self.arbitrary_role = unit.new_role_ref(name=uuid.uuid4().hex)
PROVIDERS.role_api.create_role(self.arbitrary_role['id'],
self.arbitrary_role)
PROVIDERS.assignment_api.add_role_to_user_and_project(
self.user_id, self.project_id, self.arbitrary_role['id'])
self.credential = unit.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id)
PROVIDERS.credential_api.create_credential(
self.credential['id'],
self.credential)
def test_list_credentials_filtered_by_user_id(self):
"""Call ``GET /credentials?user_id={user_id}``."""
credential = unit.new_credential_ref(user_id=uuid.uuid4().hex)
PROVIDERS.credential_api.create_credential(
credential['id'], credential
)
r = self.get('/credentials?user_id=%s' % self.user['id'])
self.assertValidCredentialListResponse(r, ref=self.credential)
for cred in r.result['credentials']:
self.assertEqual(self.user['id'], cred['user_id'])
def test_create_credential(self):
"""Call ``POST /credentials``."""
ref = unit.new_credential_ref(user_id=self.user['id'])
r = self.post(
'/credentials',
body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
def test_get_credential(self):
"""Call ``GET /credentials/{credential_id}``."""
r = self.get(
'/credentials/%(credential_id)s' % {
'credential_id': self.credential['id']})
self.assertValidCredentialResponse(r, self.credential)
def test_update_credential(self):
"""Call ``PATCH /credentials/{credential_id}``."""
ref = unit.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id)
del ref['id']
r = self.patch(
'/credentials/%(credential_id)s' % {
'credential_id': self.credential['id']},
body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
def test_delete_credential(self):
"""Call ``DELETE /credentials/{credential_id}``."""
self.delete(
'/credentials/%(credential_id)s' % {
'credential_id': self.credential['id']})
class TestCredentialTrustScoped(test_v3.RestfulTestCase):
"""Test credential with trust scoped token."""