Add Trusts unique constraint to remove duplicates

For now, effectively there could be multiple trusts with the same
project, trustor, trustee, expiry date, impersonation. The same
combination can have multiple trusts assigned with different roles
or not.

Patch fixes this issue by adding unique constraint to the trusts
database model. If two requests create trusts with the same
trustor, trustee, project, expiry, impersonation, then the second
request would bring up an exception saying there's a conflict.

This can help to improve specific trusts identification and
improve user experience.

Change-Id: I1a681b13cfbef40bf6c21271fb80966517fb1ec5
Closes-Bug: #1475091
This commit is contained in:
Kent Wang 2015-10-23 05:58:13 -07:00
parent 40453931a9
commit 59b09b50ff
5 changed files with 76 additions and 2 deletions

View File

@ -0,0 +1,26 @@
# Copyright 2015 Intel Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from migrate import UniqueConstraint
from sqlalchemy import MetaData, Table
def upgrade(migrate_engine):
meta = MetaData(bind=migrate_engine)
trusts = Table('trust', meta, autoload=True)
UniqueConstraint('trustor_user_id', 'trustee_user_id', 'project_id',
'impersonation', 'expires_at', table=trusts,
name='duplicate_trust_constraint').create()

View File

@ -4786,13 +4786,13 @@ class TrustTests(object):
def create_sample_trust(self, new_id, remaining_uses=None):
self.trustor = self.user_foo
self.trustee = self.user_two
expires_at = datetime.datetime.utcnow().replace(year=2031)
trust_data = (self.trust_api.create_trust
(new_id,
{'trustor_user_id': self.trustor['id'],
'trustee_user_id': self.user_two['id'],
'project_id': self.tenant_bar['id'],
'expires_at': timeutils.
parse_isotime('2031-02-18T18:10:00Z'),
'expires_at': expires_at,
'impersonation': True,
'remaining_uses': remaining_uses},
roles=[{"id": "member"},
@ -4914,6 +4914,26 @@ class TrustTests(object):
self.trust_api.get_trust,
trust_data['id'])
def test_duplicate_trusts_not_allowed(self):
self.trustor = self.user_foo
self.trustee = self.user_two
trust_data = {'trustor_user_id': self.trustor['id'],
'trustee_user_id': self.user_two['id'],
'project_id': self.tenant_bar['id'],
'expires_at': timeutils.parse_isotime(
'2031-02-18T18:10:00Z'),
'impersonation': True,
'remaining_uses': None}
roles = [{"id": "member"},
{"id": "other"},
{"id": "browser"}]
self.trust_api.create_trust(uuid.uuid4().hex, trust_data, roles)
self.assertRaises(exception.Conflict,
self.trust_api.create_trust,
uuid.uuid4().hex,
trust_data,
roles)
class CatalogTests(object):

View File

@ -808,6 +808,13 @@ class SqlUpgradeTests(SqlMigrateBase):
self.assertTableDoesNotExist('endpoint_group')
self.assertTableDoesNotExist('project_endpoint_group')
def test_add_trust_unique_constraint_upgrade(self):
self.upgrade(86)
inspector = reflection.Inspector.from_engine(self.engine)
constraints = inspector.get_unique_constraints('trust')
constraint_names = [constraint['name'] for constraint in constraints]
self.assertIn('duplicate_trust_constraint', constraint_names)
def populate_user_table(self, with_pass_enab=False,
with_pass_enab_domain=False):
# Populate the appropriate fields in the user

View File

@ -2847,6 +2847,8 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
# Create first trust with extended set of roles
ref = self.redelegated_trust_ref
ref['expires_at'] = datetime.datetime.utcnow().replace(
year=2031).strftime(unit.TIME_FORMAT)
ref['roles'].append({'id': role['id']})
r = self.post('/OS-TRUST/trusts',
body={'trust': ref})
@ -2859,6 +2861,9 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
trust_token = self._get_trust_token(trust)
# Chain second trust with roles subset
self.chained_trust_ref['expires_at'] = (
datetime.datetime.utcnow().replace(year=2030).strftime(
unit.TIME_FORMAT))
r = self.post('/OS-TRUST/trusts',
body={'trust': self.chained_trust_ref},
token=trust_token)
@ -2879,6 +2884,8 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
expires=dict(minutes=1),
role_names=[self.role['name']],
allow_redelegation=True)
ref['expires_at'] = datetime.datetime.utcnow().replace(
year=2031).strftime(unit.TIME_FORMAT)
r = self.post('/OS-TRUST/trusts',
body={'trust': ref})
trust = self.assertValidTrustResponse(r)
@ -2892,6 +2899,8 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
impersonation=True,
role_names=[self.role['name']],
allow_redelegation=True)
ref['expires_at'] = datetime.datetime.utcnow().replace(
year=2030).strftime(unit.TIME_FORMAT)
r = self.post('/OS-TRUST/trusts',
body={'trust': ref},
token=trust_token)
@ -2924,12 +2933,18 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
expected_status=http_client.FORBIDDEN)
def test_redelegation_terminator(self):
self.redelegated_trust_ref['expires_at'] = (
datetime.datetime.utcnow().replace(year=2031).strftime(
unit.TIME_FORMAT))
r = self.post('/OS-TRUST/trusts',
body={'trust': self.redelegated_trust_ref})
trust = self.assertValidTrustResponse(r)
trust_token = self._get_trust_token(trust)
# Build second trust - the terminator
self.chained_trust_ref['expires_at'] = (
datetime.datetime.utcnow().replace(year=2030).strftime(
unit.TIME_FORMAT))
ref = dict(self.chained_trust_ref,
redelegation_count=1,
allow_redelegation=False)
@ -3834,6 +3849,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
role_ids=[self.role_id])
for i in range(3):
ref['expires_at'] = datetime.datetime.utcnow().replace(
year=2031).strftime(unit.TIME_FORMAT)
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
self.assertValidTrustResponse(r, ref)

View File

@ -45,6 +45,10 @@ class TrustModel(sql.ModelBase, sql.DictBase):
expires_at = sql.Column(sql.DateTime)
remaining_uses = sql.Column(sql.Integer, nullable=True)
extra = sql.Column(sql.JsonBlob())
__table_args__ = (sql.UniqueConstraint(
'trustor_user_id', 'trustee_user_id', 'project_id',
'impersonation', 'expires_at',
name='duplicate_trust_constraint'),)
class TrustRole(sql.ModelBase):