Limited use trusts

Trusts now have a "remaining_uses" field that tracks how many times
a trust can still issue a token. It is decremented by 1 each time a
trust related authentication occurs (call to /auth/tokens), until it
reaches 0 and no token can be issued through this trust anymore. If
set to null (default value), trusts can be used indefinitely to
authenticate.

Closes-Bug: #1250617
Implements: bp trusts-chained-delegation
DocImpact
Co-Authored-By: Florent Flament <florent.flament-ext@cloudwatt.com>

Change-Id: I2c80b6d548a6715da0366c6f64ee58fbce514adb
This commit is contained in:
Matthieu Huin 2013-11-13 17:24:33 +01:00 committed by Dolph Mathews
parent cf20a1a3d8
commit db9e0c6c4a
11 changed files with 331 additions and 7 deletions

View File

@ -286,7 +286,7 @@ class AuthInfo(object):
@dependency.requires('assignment_api', 'identity_api', 'token_api',
'token_provider_api')
'token_provider_api', 'trust_api')
class Auth(controller.V3Controller):
# Note(atiwari): From V3 auth controller code we are
@ -318,6 +318,10 @@ class Auth(controller.V3Controller):
auth_info.set_scope(None, auth_context['project_id'], None)
self._check_and_set_default_scoping(auth_info, auth_context)
(domain_id, project_id, trust) = auth_info.get_scope()
if trust:
self.trust_api.consume_use(trust['id'])
method_names = auth_info.get_method_names()
method_names += auth_context.get('method_names', [])
# make sure the list is unique

View File

@ -0,0 +1,44 @@
# Copyright (c) 2014 Matthieu Huin <mhu@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
def downgrade_trust_table_with_column_drop(meta, migrate_engine):
trust_table = sqlalchemy.Table('trust', meta, autoload=True)
# delete trusts with a limited use count, we are downgrading so uses
# will not be tracked anymore.
d = trust_table.delete(trust_table.c.remaining_uses >= 0)
d.execute()
trust_table.drop_column('remaining_uses')
def upgrade_trust_table(meta, migrate_engine):
trust_table = sqlalchemy.Table('trust', meta, autoload=True)
trust_table.create_column(sqlalchemy.Column('remaining_uses',
sqlalchemy.Integer(),
nullable=True))
def upgrade(migrate_engine):
meta = sqlalchemy.MetaData()
meta.bind = migrate_engine
upgrade_trust_table(meta, migrate_engine)
def downgrade(migrate_engine):
meta = sqlalchemy.MetaData()
meta.bind = migrate_engine
downgrade_trust_table_with_column_drop(meta, migrate_engine)

View File

@ -236,6 +236,10 @@ class TrustNotFound(NotFound):
message_format = _("Could not find trust, %(trust_id)s.")
class TrustUseLimitReached(Forbidden):
message_format = _("No remaining uses for trust %(trust_id)s.")
class CredentialNotFound(NotFound):
message_format = _("Could not find credential, %(credential_id)s.")

View File

@ -3367,7 +3367,7 @@ class TokenCacheInvalidation(object):
class TrustTests(object):
def create_sample_trust(self, new_id):
def create_sample_trust(self, new_id, remaining_uses=None):
self.trustor = self.user_foo
self.trustee = self.user_two
trust_data = (self.trust_api.create_trust
@ -3377,7 +3377,8 @@ class TrustTests(object):
'project_id': self.tenant_bar['id'],
'expires_at': timeutils.
parse_isotime('2031-02-18T18:10:00Z'),
'impersonation': True},
'impersonation': True,
'remaining_uses': remaining_uses},
roles=[{"id": "member"},
{"id": "other"},
{"id": "browser"}]))
@ -3445,6 +3446,39 @@ class TrustTests(object):
trusts = self.trust_api.list_trusts()
self.assertEqual(len(trusts), 3)
def test_trust_has_remaining_uses_positive(self):
# create a trust with limited uses, check that we have uses left
trust_data = self.create_sample_trust(uuid.uuid4().hex,
remaining_uses=5)
self.assertEqual(5, trust_data['remaining_uses'])
# create a trust with unlimited uses, check that we have uses left
trust_data = self.create_sample_trust(uuid.uuid4().hex)
self.assertIsNone(trust_data['remaining_uses'])
def test_trust_has_remaining_uses_negative(self):
# try to create a trust with no remaining uses, check that it fails
self.assertRaises(exception.ValidationError,
self.create_sample_trust,
uuid.uuid4().hex,
remaining_uses=0)
# try to create a trust with negative remaining uses,
# check that it fails
self.assertRaises(exception.ValidationError,
self.create_sample_trust,
uuid.uuid4().hex,
remaining_uses=-12)
def test_consume_use(self):
# consume a trust repeatedly until it has no uses anymore
trust_data = self.create_sample_trust(uuid.uuid4().hex,
remaining_uses=2)
self.trust_api.consume_use(trust_data['id'])
t = self.trust_api.get_trust(trust_data['id'])
self.assertEqual(1, t['remaining_uses'])
self.trust_api.consume_use(trust_data['id'])
# This was the last use, the trust isn't available anymore
self.assertIsNone(self.trust_api.get_trust(trust_data['id']))
class CommonHelperTests(tests.TestCase):
def test_format_helper_raises_malformed_on_missing_key(self):

View File

@ -2020,6 +2020,78 @@ class SqlUpgradeTests(SqlMigrateBase):
check_grants(session, base_data)
session.close()
def test_limited_trusts_upgrade(self):
# make sure that the remaining_uses column is created
self.upgrade(41)
self.assertTableColumns('trust',
['id', 'trustor_user_id',
'trustee_user_id',
'project_id', 'impersonation',
'deleted_at',
'expires_at', 'extra',
'remaining_uses'])
def test_limited_trusts_downgrade(self):
# make sure that the remaining_uses column is removed
self.upgrade(41)
self.downgrade(40)
self.assertTableColumns('trust',
['id', 'trustor_user_id',
'trustee_user_id',
'project_id', 'impersonation',
'deleted_at',
'expires_at', 'extra'])
def test_limited_trusts_downgrade_trusts_cleanup(self):
# make sure that only trusts with unlimited uses are kept in the
# downgrade
self.upgrade(41)
session = self.Session()
trust_table = sqlalchemy.Table(
'trust', self.metadata, autoload=True)
limited_trust = {
'id': uuid.uuid4().hex,
'trustor_user_id': uuid.uuid4().hex,
'trustee_user_id': uuid.uuid4().hex,
'project_id': uuid.uuid4().hex,
'impersonation': True,
'remaining_uses': 5
}
consumed_trust = {
'id': uuid.uuid4().hex,
'trustor_user_id': uuid.uuid4().hex,
'trustee_user_id': uuid.uuid4().hex,
'project_id': uuid.uuid4().hex,
'impersonation': True,
'remaining_uses': 0
}
unlimited_trust = {
'id': uuid.uuid4().hex,
'trustor_user_id': uuid.uuid4().hex,
'trustee_user_id': uuid.uuid4().hex,
'project_id': uuid.uuid4().hex,
'impersonation': True,
'remaining_uses': None
}
self.insert_dict(session, 'trust', limited_trust)
self.insert_dict(session, 'trust', consumed_trust)
self.insert_dict(session, 'trust', unlimited_trust)
trust_table = sqlalchemy.Table(
'trust', self.metadata, autoload=True)
# we should have 3 trusts in base
self.assertEqual(3, session.query(trust_table).count())
self.downgrade(40)
session = self.Session()
trust_table = sqlalchemy.Table(
'trust', self.metadata, autoload=True)
# Now only one trust remains ...
self.assertEqual(1, session.query(trust_table.columns.id).count())
# ... and this trust is the one that was not limited in uses
self.assertEqual(
unlimited_trust['id'],
session.query(trust_table.columns.id).one()[0])
def populate_user_table(self, with_pass_enab=False,
with_pass_enab_domain=False):
# Populate the appropriate fields in the user

View File

@ -242,13 +242,14 @@ class RestfulTestCase(rest.RestfulTestCase):
def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None,
impersonation=None, expires=None, role_ids=None,
role_names=None):
role_names=None, remaining_uses=None):
ref = self.new_ref()
ref['trustor_user_id'] = trustor_user_id
ref['trustee_user_id'] = trustee_user_id
ref['impersonation'] = impersonation or False
ref['project_id'] = project_id
ref['remaining_uses'] = remaining_uses
if isinstance(expires, six.string_types):
ref['expires_at'] = expires

View File

@ -2037,6 +2037,109 @@ class TestTrustAuth(TestAuthInfo):
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
self.assertValidTrustResponse(r, ref)
def _initialize_test_consume_trust(self, count):
# Make sure remaining_uses is decremented as we consume the trust
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
remaining_uses=count,
role_ids=[self.role_id])
del ref['id']
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
# make sure the trust exists
trust = self.assertValidTrustResponse(r, ref)
r = self.get(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
expected_status=200)
# get a token for the trustee
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'])
r = self.post('/auth/tokens', body=auth_data, expected_status=201)
token = r.headers.get('X-Subject-Token')
# get a trust token, consume one use
auth_data = self.build_authentication_request(
token=token,
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data, expected_status=201)
return trust
def test_consume_trust_once(self):
trust = self._initialize_test_consume_trust(2)
# check decremented value
r = self.get(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
expected_status=200)
trust = r.result.get('trust')
self.assertIsNotNone(trust)
self.assertEqual(trust['remaining_uses'], 1)
def test_create_one_time_use_trust(self):
trust = self._initialize_test_consume_trust(1)
# No more uses, the trust is made unavailable
self.get(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
expected_status=404)
# this time we can't get a trust token
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_create_trust_with_bad_values_for_remaining_uses(self):
# negative values for the remaining_uses parameter are forbidden
self._create_trust_with_bad_remaining_use(bad_value=-1)
# 0 is a forbidden value as well
self._create_trust_with_bad_remaining_use(bad_value=0)
# as are non integer values
self._create_trust_with_bad_remaining_use(bad_value="a bad value")
self._create_trust_with_bad_remaining_use(bad_value=7.2)
def _create_trust_with_bad_remaining_use(self, bad_value):
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
remaining_uses=bad_value,
role_ids=[self.role_id])
del ref['id']
self.post('/OS-TRUST/trusts',
body={'trust': ref},
expected_status=400)
def test_create_unlimited_use_trust(self):
# by default trusts are unlimited in terms of tokens that can be
# generated from them, this test creates such a trust explicitly
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
remaining_uses=None,
role_ids=[self.role_id])
del ref['id']
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r, ref)
r = self.get(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
expected_status=200)
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'])
r = self.post('/auth/tokens', body=auth_data, expected_status=201)
token = r.headers.get('X-Subject-Token')
auth_data = self.build_authentication_request(
token=token,
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data, expected_status=201)
r = self.get(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
expected_status=200)
trust = r.result.get('trust')
self.assertIsNone(trust['remaining_uses'])
def test_trust_crud(self):
ref = self.new_trust_ref(
trustor_user_id=self.user_id,

View File

@ -183,6 +183,8 @@ class Auth(controller.V2Controller):
trust_ref['trustee_user_id'])
if not trustee_user_ref['enabled']:
raise exception.Forbidden()()
self.trust_api.consume_use(auth['trust_id'])
if trust_ref['impersonation'] is True:
current_user_ref = trustor_user_ref
else:

View File

@ -28,13 +28,18 @@ def _filter_trust(ref):
return None
if ref.get('expires_at') and timeutils.utcnow() > ref['expires_at']:
return None
remaining_uses = ref.get('remaining_uses')
# Do not return trusts that can't be used anymore
if remaining_uses is not None:
if remaining_uses <= 0:
return None
ref = copy.deepcopy(ref)
return ref
class Trust(kvs.Base, trust.Driver):
def create_trust(self, trust_id, trust, roles):
trust_ref = trust
trust_ref = copy.deepcopy(trust)
trust_ref['id'] = trust_id
trust_ref['deleted'] = False
trust_ref['roles'] = roles
@ -52,7 +57,23 @@ class Trust(kvs.Base, trust.Driver):
trustor_list = self.db.get('trustor-%s' % trustor_user_id, [])
trustor_list.append(trust_id)
self.db.set('trustor-%s' % trustor_user_id, trustor_list)
return copy.deepcopy(trust_ref)
return trust_ref
def consume_use(self, trust_id):
try:
orig_ref = self.db.get('trust-%s' % trust_id)
except exception.NotFound:
raise exception.TrustNotFound(trust_id=trust_id)
remaining_uses = orig_ref.get('remaining_uses')
if remaining_uses is None:
# unlimited uses, do nothing
return
elif remaining_uses > 0:
ref = copy.deepcopy(orig_ref)
ref['remaining_uses'] -= 1
self.db.set('trust-%s' % trust_id, ref)
else:
raise exception.TrustUseLimitReached(trust_id=trust_id)
def get_trust(self, trust_id):
try:

View File

@ -22,7 +22,8 @@ from keystone import trust
class TrustModel(sql.ModelBase, sql.DictBase):
__tablename__ = 'trust'
attributes = ['id', 'trustor_user_id', 'trustee_user_id',
'project_id', 'impersonation', 'expires_at']
'project_id', 'impersonation', 'expires_at',
'remaining_uses']
id = sql.Column(sql.String(64), primary_key=True)
#user id Of owner
trustor_user_id = sql.Column(sql.String(64), nullable=False,)
@ -32,6 +33,7 @@ class TrustModel(sql.ModelBase, sql.DictBase):
impersonation = sql.Column(sql.Boolean, nullable=False)
deleted_at = sql.Column(sql.DateTime)
expires_at = sql.Column(sql.DateTime)
remaining_uses = sql.Column(sql.Integer, nullable=True)
extra = sql.Column(sql.JsonBlob())
@ -70,6 +72,23 @@ class Trust(trust.Driver):
trust_dict['roles'] = roles
@sql.handle_conflicts(conflict_type='trust')
def consume_use(self, trust_id):
session = db_session.get_session()
with session.begin():
ref = (session.query(TrustModel).
with_lockmode('update').
filter_by(deleted_at=None).
filter_by(id=trust_id).first())
if ref is None:
raise exception.TrustNotFound(trust_id=trust_id)
if ref.remaining_uses is None:
# unlimited uses, do nothing
pass
elif ref.remaining_uses > 0:
ref.remaining_uses -= 1
else:
raise exception.TrustUseLimitReached(trust_id=trust_id)
def get_trust(self, trust_id):
session = db_session.get_session()
ref = (session.query(TrustModel).
@ -81,6 +100,10 @@ class Trust(trust.Driver):
now = timeutils.utcnow()
if now > ref.expires_at:
return None
# Do not return trusts that can't be used anymore
if ref.remaining_uses is not None:
if ref.remaining_uses <= 0:
return None
trust_dict = ref.to_dict()
self._add_roles(trust_id, session, trust_dict)

View File

@ -50,6 +50,12 @@ class Manager(manager.Manager):
:returns: a new trust
"""
trust.setdefault('remaining_uses', None)
if trust['remaining_uses'] is not None:
if (trust['remaining_uses'] <= 0 or
not isinstance(trust['remaining_uses'], int)):
msg = _('remaining_uses must be a positive integer or null.')
raise exception.ValidationError(msg)
return self.driver.create_trust(trust_id, trust, roles)
@notifications.deleted(_TRUST)
@ -91,3 +97,13 @@ class Driver(object):
@abc.abstractmethod
def delete_trust(self, trust_id):
raise exception.NotImplemented()
@abc.abstractmethod
def consume_use(self, trust_id):
"""Consume one use when a trust was created with a limitation on its
uses, provided there are still uses available.
:raises: keystone.exception.TrustUseLimitReached,
keystone.exception.TrustNotFound
"""
raise exception.NotImplemented()