Merge "Added tenant name validation. Fixes bug 966249."

This commit is contained in:
Jenkins 2012-05-18 16:35:18 +00:00 committed by Gerrit Code Review
commit 674287f808
7 changed files with 119 additions and 3 deletions

44
keystone/clean.py Normal file
View File

@ -0,0 +1,44 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone import exception
def check_length(property_name, value, min_length=1, max_length=64):
if len(value) < min_length:
if min_length == 1:
msg = "%s cannot be empty." % property_name
else:
msg = ("%(property_name)s cannot be less than "
"%(min_length)s characters.") % locals()
raise exception.ValidationError(msg)
if len(value) > max_length:
msg = ("%(property_name)s should not be greater than "
"%(max_length)s characters.") % locals()
raise exception.ValidationError(msg)
def check_type(property_name, value, expected_type, display_expected_type):
if not isinstance(value, expected_type):
msg = "%(property_name)s is not a %(display_expected_type)s" % locals()
raise exception.ValidationError(msg)
def tenant_name(name):
check_type("Tenant name", name, basestring, "string or unicode")
name = name.strip()
check_length("Tenant name", name)
return name

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystone import clean
from keystone import exception
from keystone import identity
from keystone.common import kvs
@ -195,6 +196,7 @@ class Identity(kvs.Base, identity.Driver):
return None
def create_tenant(self, tenant_id, tenant):
tenant['name'] = clean.tenant_name(tenant['name'])
if self.get_tenant(tenant_id):
msg = 'Duplicate ID, %s.' % tenant_id
raise exception.Conflict(type='tenant', details=msg)
@ -207,6 +209,7 @@ class Identity(kvs.Base, identity.Driver):
def update_tenant(self, tenant_id, tenant):
if 'name' in tenant:
tenant['name'] = clean.tenant_name(tenant['name'])
existing = self.db.get('tenant_name-%s' % tenant['name'])
if existing and tenant_id != existing['id']:
msg = 'Duplicate name, %s.' % tenant['name']

View File

@ -19,6 +19,7 @@ import uuid
import ldap
from ldap import filter as ldap_filter
from keystone import clean
from keystone import config
from keystone import exception
from keystone import identity
@ -166,12 +167,15 @@ class Identity(identity.Driver):
return self.user.update(user_id, user)
def create_tenant(self, tenant_id, tenant):
tenant['name'] = clean.tenant_name(tenant['name'])
data = tenant.copy()
if 'id' not in data or data['id'] is None:
data['id'] = str(uuid.uuid4().hex)
return self.tenant.create(tenant)
def update_tenant(self, tenant_id, tenant):
if 'name' in tenant:
tenant['name'] = clean.tenant_name(tenant['name'])
return self.tenant.update(tenant_id, tenant)
def create_metadata(self, user_id, tenant_id, metadata):

View File

@ -17,8 +17,9 @@
import copy
import functools
from keystone import identity
from keystone import clean
from keystone import exception
from keystone import identity
from keystone.common import sql
from keystone.common import utils
from keystone.common.sql import migration
@ -348,6 +349,7 @@ class Identity(sql.Base, identity.Driver):
@handle_conflicts(type='tenant')
def create_tenant(self, tenant_id, tenant):
tenant['name'] = clean.tenant_name(tenant['name'])
session = self.get_session()
with session.begin():
tenant_ref = Tenant.from_dict(tenant)
@ -357,6 +359,8 @@ class Identity(sql.Base, identity.Driver):
@handle_conflicts(type='tenant')
def update_tenant(self, tenant_id, tenant):
if 'name' in tenant:
tenant['name'] = clean.tenant_name(tenant['name'])
session = self.get_session()
with session.begin():
tenant_ref = session.query(Tenant).filter_by(id=tenant_id).first()

View File

@ -303,6 +303,65 @@ class IdentityTests(object):
tenants = self.identity_api.get_tenants_for_user('foo')
self.assertIn(tenant_id, tenants)
def test_create_tenant_long_name_fails(self):
tenant = {'id': 'fake1', 'name': 'a' * 65}
self.assertRaises(exception.ValidationError,
self.identity_api.create_tenant,
tenant['id'],
tenant)
def test_create_tenant_blank_name_fails(self):
tenant = {'id': 'fake1', 'name': ''}
self.assertRaises(exception.ValidationError,
self.identity_api.create_tenant,
tenant['id'],
tenant)
def test_create_tenant_invalid_name_fails(self):
tenant = {'id': 'fake1', 'name': None}
self.assertRaises(exception.ValidationError,
self.identity_api.create_tenant,
tenant['id'],
tenant)
tenant = {'id': 'fake1', 'name': 123}
self.assertRaises(exception.ValidationError,
self.identity_api.create_tenant,
tenant['id'],
tenant)
def test_update_tenant_blank_name_fails(self):
tenant = {'id': 'fake1', 'name': 'fake1'}
self.identity_api.create_tenant('fake1', tenant)
tenant['name'] = ''
self.assertRaises(exception.ValidationError,
self.identity_api.update_tenant,
tenant['id'],
tenant)
def test_update_tenant_long_name_fails(self):
tenant = {'id': 'fake1', 'name': 'fake1'}
self.identity_api.create_tenant('fake1', tenant)
tenant['name'] = 'a' * 65
self.assertRaises(exception.ValidationError,
self.identity_api.update_tenant,
tenant['id'],
tenant)
def test_update_tenant_invalid_name_fails(self):
tenant = {'id': 'fake1', 'name': 'fake1'}
self.identity_api.create_tenant('fake1', tenant)
tenant['name'] = None
self.assertRaises(exception.ValidationError,
self.identity_api.update_tenant,
tenant['id'],
tenant)
tenant['name'] = 123
self.assertRaises(exception.ValidationError,
self.identity_api.update_tenant,
tenant['id'],
tenant)
class TokenTests(object):
def test_token_crud(self):
@ -326,7 +385,8 @@ class TokenTests(object):
def test_expired_token(self):
token_id = uuid.uuid4().hex
expire_time = datetime.datetime.utcnow() - datetime.timedelta(minutes=1)
expire_time = datetime.datetime.utcnow() - datetime.timedelta(
minutes=1)
data = {'id': token_id, 'a': 'b', 'expires': expire_time}
data_ref = self.token_api.create_token(token_id, data)
self.assertDictEqual(data_ref, data)

View File

@ -13,6 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone import test
from keystone.identity.backends import kvs as identity_kvs
from keystone.token.backends import kvs as token_kvs

View File

@ -70,7 +70,7 @@ class SqlIdentity(test.TestCase, test_backend.IdentityTests):
def test_create_null_tenant_name(self):
tenant = {'id': uuid.uuid4().hex,
'name': None}
self.assertRaises(exception.Conflict,
self.assertRaises(exception.ValidationError,
self.identity_api.create_tenant,
tenant['id'],
tenant)