Refactor create_trust for readability
I refactored the create_trust function in the keystone.trust.controllers module in order to make it more readable. In addition, the function will now check for all the required attributes in the request and return an appropriate error message if any of them is missing or is empty. Change-Id: I6f988a34e17e821b4a8544bec516845665ba2674
This commit is contained in:
parent
791671860d
commit
820e4f153a
@ -281,10 +281,31 @@ class Application(BaseApplication):
|
||||
# Accept either is_admin or the admin role
|
||||
self.policy_api.enforce(creds, 'admin_required', {})
|
||||
|
||||
def _require_attribute(self, ref, attr):
|
||||
"""Ensures the reference contains the specified attribute."""
|
||||
if ref.get(attr) is None or ref.get(attr) == '':
|
||||
msg = _('%s field is required and cannot be empty') % attr
|
||||
def _attribute_is_empty(self, ref, attribute):
|
||||
"""Returns true if the attribute in the given ref (which is a
|
||||
dict) is empty or None.
|
||||
"""
|
||||
return ref.get(attribute) is None or ref.get(attribute) == ''
|
||||
|
||||
def _require_attribute(self, ref, attribute):
|
||||
"""Ensures the reference contains the specified attribute.
|
||||
|
||||
Raise a ValidationError if the given attribute is not present
|
||||
"""
|
||||
if self._attribute_is_empty(ref, attribute):
|
||||
msg = _('%s field is required and cannot be empty') % attribute
|
||||
raise exception.ValidationError(message=msg)
|
||||
|
||||
def _require_attributes(self, ref, attrs):
|
||||
"""Ensures the reference contains the specified attributes.
|
||||
|
||||
Raise a ValidationError if any of the given attributes is not present
|
||||
"""
|
||||
missing_attrs = [attribute for attribute in attrs
|
||||
if self._attribute_is_empty(ref, attribute)]
|
||||
|
||||
if missing_attrs:
|
||||
msg = _('%s field(s) cannot be empty') % ', '.join(missing_attrs)
|
||||
raise exception.ValidationError(message=msg)
|
||||
|
||||
def _get_trust_id_for_request(self, context):
|
||||
|
@ -36,6 +36,27 @@ class FakeApp(wsgi.Application):
|
||||
return {'a': 'b'}
|
||||
|
||||
|
||||
class FakeAttributeCheckerApp(wsgi.Application):
|
||||
def index(self, context):
|
||||
return context['query_string']
|
||||
|
||||
def has_attribute(self, body, attr):
|
||||
body_dict = jsonutils.loads(body)
|
||||
try:
|
||||
self._require_attribute(body_dict, attr)
|
||||
return True
|
||||
except exception.ValidationError as ex:
|
||||
return (False, ex.message)
|
||||
|
||||
def has_attributes(self, body, attr):
|
||||
body_dict = jsonutils.loads(body)
|
||||
try:
|
||||
self._require_attributes(body_dict, attr)
|
||||
return True
|
||||
except exception.ValidationError as ex:
|
||||
return (False, ex.message)
|
||||
|
||||
|
||||
class BaseWSGITest(tests.TestCase):
|
||||
def setUp(self):
|
||||
self.app = FakeApp()
|
||||
@ -89,6 +110,43 @@ class ApplicationTest(BaseWSGITest):
|
||||
self.assertEqual(resp.status, '501 Not Implemented')
|
||||
self.assertEqual(resp.status_int, 501)
|
||||
|
||||
def test_successful_require_attribute(self):
|
||||
app = FakeAttributeCheckerApp()
|
||||
req = self._make_request(url='/?1=2')
|
||||
resp = req.get_response(app)
|
||||
self.assertTrue(app.has_attribute(resp.body, '1'))
|
||||
|
||||
def test_require_attribute(self):
|
||||
app = FakeAttributeCheckerApp()
|
||||
req = self._make_request(url='/?1=2')
|
||||
resp = req.get_response(app)
|
||||
self.assertFalse(app.has_attribute(resp.body, 'a')[0])
|
||||
|
||||
def test_successful_require_multiple_attributes(self):
|
||||
app = FakeAttributeCheckerApp()
|
||||
req = self._make_request(url='/?a=1&b=2')
|
||||
resp = req.get_response(app)
|
||||
self.assertTrue(app.has_attributes(resp.body, ['a', 'b']))
|
||||
|
||||
def test_attribute_missing_from_request(self):
|
||||
app = FakeAttributeCheckerApp()
|
||||
req = self._make_request(url='/?a=1&b=2')
|
||||
resp = req.get_response(app)
|
||||
validation = app.has_attributes(resp.body, ['a',
|
||||
'missing_attribute'])
|
||||
self.assertFalse(validation[0])
|
||||
self.assertTrue('missing_attribute' in validation[1])
|
||||
|
||||
def test_no_required_attributes_present(self):
|
||||
app = FakeAttributeCheckerApp()
|
||||
req = self._make_request(url='/')
|
||||
resp = req.get_response(app)
|
||||
validation = app.has_attributes(resp.body, ['missing_attribute1',
|
||||
'missing_attribute2'])
|
||||
self.assertFalse(validation[0])
|
||||
self.assertTrue('missing_attribute1' in validation[1])
|
||||
self.assertTrue('missing_attribute2' in validation[1])
|
||||
|
||||
def test_render_response_custom_headers(self):
|
||||
resp = wsgi.render_response(headers=[('Custom-Header', 'Some-Value')])
|
||||
self.assertEqual(resp.headers.get('Custom-Header'), 'Some-Value')
|
||||
|
@ -30,11 +30,6 @@ LOG = log.getLogger(__name__)
|
||||
CONF = config.CONF
|
||||
|
||||
|
||||
def _trustor_only(context, trust, user_id):
|
||||
if user_id != trust.get('trustor_user_id'):
|
||||
raise exception.Forbidden()
|
||||
|
||||
|
||||
def _trustor_trustee_only(trust, user_id):
|
||||
if (user_id != trust.get('trustee_user_id') and
|
||||
user_id != trust.get('trustor_user_id')):
|
||||
@ -129,54 +124,63 @@ class TrustV3(controller.V3Controller):
|
||||
The user creating the trust must be the trustor.
|
||||
|
||||
"""
|
||||
|
||||
# TODO(ayoung): instead of raising ValidationError on the first
|
||||
# problem, return a collection of all the problems.
|
||||
if not trust:
|
||||
raise exception.ValidationError(attribute='trust',
|
||||
target='request')
|
||||
self._require_attributes(trust, ['impersonation', 'trustee_user_id',
|
||||
'trustor_user_id'])
|
||||
if trust.get('project_id'):
|
||||
self._require_role(trust)
|
||||
self._require_user_is_trustor(context, trust)
|
||||
self._require_trustee_exists(trust['trustee_user_id'])
|
||||
all_roles = self.assignment_api.list_roles()
|
||||
clean_roles = self._clean_role_list(context, trust, all_roles)
|
||||
self._require_trustor_has_role_in_project(trust, clean_roles)
|
||||
trust['expires_at'] = self._parse_expiration_date(
|
||||
trust.get('expires_at'))
|
||||
trust_id = uuid.uuid4().hex
|
||||
new_trust = self.trust_api.create_trust(trust_id, trust, clean_roles)
|
||||
self._fill_in_roles(context, new_trust, all_roles)
|
||||
return TrustV3.wrap_member(context, new_trust)
|
||||
|
||||
self._require_attribute(trust, 'impersonation')
|
||||
self._require_attribute(trust, 'trustee_user_id')
|
||||
def _require_trustee_exists(self, trustee_user_id):
|
||||
self.identity_api.get_user(trustee_user_id)
|
||||
|
||||
if trust.get('project_id') and not trust.get('roles'):
|
||||
def _require_user_is_trustor(self, context, trust):
|
||||
user_id = self._get_user_id(context)
|
||||
if user_id != trust.get('trustor_user_id'):
|
||||
raise exception.Forbidden(
|
||||
_("The authenticated user should match the trustor."))
|
||||
|
||||
def _require_role(self, trust):
|
||||
if not trust.get('roles'):
|
||||
raise exception.Forbidden(
|
||||
_('At least one role should be specified.'))
|
||||
|
||||
def _get_user_role(self, trust):
|
||||
if not self._attribute_is_empty(trust, 'project_id'):
|
||||
return self.assignment_api.get_roles_for_user_and_project(
|
||||
trust['trustor_user_id'], trust['project_id'])
|
||||
else:
|
||||
return []
|
||||
|
||||
def _require_trustor_has_role_in_project(self, trust, clean_roles):
|
||||
user_roles = self._get_user_role(trust)
|
||||
for trust_role in clean_roles:
|
||||
matching_roles = [x for x in user_roles
|
||||
if x == trust_role['id']]
|
||||
if not matching_roles:
|
||||
raise exception.RoleNotFound(role_id=trust_role['id'])
|
||||
|
||||
def _parse_expiration_date(self, expiration_date):
|
||||
if expiration_date is None:
|
||||
return None
|
||||
if not expiration_date.endswith('Z'):
|
||||
expiration_date += 'Z'
|
||||
try:
|
||||
user_id = self._get_user_id(context)
|
||||
_trustor_only(context, trust, user_id)
|
||||
# confirm that the trustee exists
|
||||
self.identity_api.get_user(trust['trustee_user_id'])
|
||||
all_roles = self.assignment_api.list_roles()
|
||||
clean_roles = self._clean_role_list(context, trust, all_roles)
|
||||
if trust.get('project_id'):
|
||||
user_role = self.assignment_api.get_roles_for_user_and_project(
|
||||
user_id,
|
||||
trust['project_id'])
|
||||
else:
|
||||
user_role = []
|
||||
for trust_role in clean_roles:
|
||||
matching_roles = [x for x in user_role
|
||||
if x == trust_role['id']]
|
||||
if not matching_roles:
|
||||
raise exception.RoleNotFound(role_id=trust_role['id'])
|
||||
if trust.get('expires_at') is not None:
|
||||
if not trust['expires_at'].endswith('Z'):
|
||||
trust['expires_at'] += 'Z'
|
||||
try:
|
||||
trust['expires_at'] = (timeutils.parse_isotime
|
||||
(trust['expires_at']))
|
||||
except ValueError:
|
||||
raise exception.ValidationTimeStampError()
|
||||
trust_id = uuid.uuid4().hex
|
||||
new_trust = self.trust_api.create_trust(trust_id,
|
||||
trust,
|
||||
clean_roles)
|
||||
self._fill_in_roles(context, new_trust, all_roles)
|
||||
return TrustV3.wrap_member(context, new_trust)
|
||||
except KeyError as e:
|
||||
raise exception.ValidationError(attribute=e.args[0],
|
||||
target='trust')
|
||||
return timeutils.parse_isotime(expiration_date)
|
||||
except ValueError:
|
||||
raise exception.ValidationTimeStampError()
|
||||
|
||||
@controller.protected()
|
||||
def list_trusts(self, context):
|
||||
|
Loading…
x
Reference in New Issue
Block a user