Support url safe restriction on new projects and domains

The first phase of providing optional support for ensuring projects
and domains do not contain any reserved characters. Subsequent
patches will support the 'strict' option for such names.

A release note will be added at the end of the chain of patches,
once the full support has been implemented.

Partially Implements: blueprint url-safe-naming

Change-Id: I9b06cb5fa521d2cd3919c072a996c528d387dfe8
This commit is contained in:
Henry Nash 2015-12-13 00:15:34 +00:00
parent 255b317ffd
commit 10754f7fb4
5 changed files with 200 additions and 0 deletions

View File

@ -397,6 +397,18 @@ FILE_OPTIONS = {
'operations on remote services. Tokens scoped to ' 'operations on remote services. Tokens scoped to '
'this project will contain the key/value ' 'this project will contain the key/value '
'`is_admin_project=true`. Defaults to None.'), '`is_admin_project=true`. Defaults to None.'),
cfg.StrOpt('project_name_url_safe',
choices=['off', 'new'], default='off',
help='Whether the names of projects are restricted from '
'containing url reserved characters. If set to new, '
'attempts to create or update a project with a url '
'unsafe name will return an error.'),
cfg.StrOpt('domain_name_url_safe',
choices=['off', 'new'], default='off',
help='Whether the names of domains are restricted from '
'containing url reserved characters. If set to new, '
'attempts to create or update a domain with a url '
'unsafe name will return an error.'),
], ],
'domain_config': [ 'domain_config': [
cfg.StrOpt('driver', cfg.StrOpt('driver',

View File

@ -523,3 +523,20 @@ def get_token_ref(context):
except KeyError: except KeyError:
LOG.warning(_LW("Couldn't find the auth context.")) LOG.warning(_LW("Couldn't find the auth context."))
raise exception.Unauthorized() raise exception.Unauthorized()
URL_RESERVED_CHARS = ":/?#[]@!$&'()*+,;="
def is_not_url_safe(name):
"""Check if a string contains any url reserved characters."""
return len(list_url_unsafe_chars(name)) > 0
def list_url_unsafe_chars(name):
"""Return a list of the reserved characters."""
reserved_chars = ''
for i in name:
if i in URL_RESERVED_CHARS:
reserved_chars += i
return reserved_chars

View File

@ -23,6 +23,7 @@ from keystone.common import clean
from keystone.common import dependency from keystone.common import dependency
from keystone.common import driver_hints from keystone.common import driver_hints
from keystone.common import manager from keystone.common import manager
from keystone.common import utils
from keystone import exception from keystone import exception
from keystone.i18n import _, _LE, _LW from keystone.i18n import _, _LE, _LW
from keystone import notifications from keystone import notifications
@ -158,8 +159,22 @@ class Manager(manager.Manager):
self._assert_max_hierarchy_depth(project_ref.get('parent_id'), self._assert_max_hierarchy_depth(project_ref.get('parent_id'),
parents_list) parents_list)
def _raise_reserved_character_exception(self, entity_type, name):
msg = _('%(entity)s name cannot contain the following reserved '
'characters: %(chars)s')
raise exception.ValidationError(
message=msg % {
'entity': entity_type,
'chars': utils.list_url_unsafe_chars(name)
})
def create_project(self, tenant_id, tenant, initiator=None): def create_project(self, tenant_id, tenant, initiator=None):
tenant = tenant.copy() tenant = tenant.copy()
if (CONF.resource.project_name_url_safe != 'off' and
utils.is_not_url_safe(tenant['name'])):
self._raise_reserved_character_exception('Project', tenant['name'])
tenant.setdefault('enabled', True) tenant.setdefault('enabled', True)
tenant['enabled'] = clean.project_enabled(tenant['enabled']) tenant['enabled'] = clean.project_enabled(tenant['enabled'])
tenant.setdefault('description', '') tenant.setdefault('description', '')
@ -264,6 +279,12 @@ class Manager(manager.Manager):
original_tenant = self.driver.get_project(tenant_id) original_tenant = self.driver.get_project(tenant_id)
tenant = tenant.copy() tenant = tenant.copy()
if (CONF.resource.project_name_url_safe != 'off' and
'name' in tenant and
tenant['name'] != original_tenant['name'] and
utils.is_not_url_safe(tenant['name'])):
self._raise_reserved_character_exception('Project', tenant['name'])
parent_id = original_tenant.get('parent_id') parent_id = original_tenant.get('parent_id')
if 'parent_id' in tenant and tenant.get('parent_id') != parent_id: if 'parent_id' in tenant and tenant.get('parent_id') != parent_id:
raise exception.ForbiddenAction( raise exception.ForbiddenAction(
@ -476,6 +497,10 @@ class Manager(manager.Manager):
if (not self.identity_api.multiple_domains_supported and if (not self.identity_api.multiple_domains_supported and
domain_id != CONF.identity.default_domain_id): domain_id != CONF.identity.default_domain_id):
raise exception.Forbidden(_('Multiple domains are not supported')) raise exception.Forbidden(_('Multiple domains are not supported'))
if (CONF.resource.domain_name_url_safe != 'off' and
utils.is_not_url_safe(domain['name'])):
self._raise_reserved_character_exception('Domain', domain['name'])
self.assert_domain_not_federated(domain_id, domain) self.assert_domain_not_federated(domain_id, domain)
domain.setdefault('enabled', True) domain.setdefault('enabled', True)
domain['enabled'] = clean.domain_enabled(domain['enabled']) domain['enabled'] = clean.domain_enabled(domain['enabled'])
@ -508,6 +533,10 @@ class Manager(manager.Manager):
def update_domain(self, domain_id, domain, initiator=None): def update_domain(self, domain_id, domain, initiator=None):
self.assert_domain_not_federated(domain_id, domain) self.assert_domain_not_federated(domain_id, domain)
original_domain = self.driver.get_domain(domain_id) original_domain = self.driver.get_domain(domain_id)
if (CONF.resource.domain_name_url_safe != 'off' and
'name' in domain and domain['name'] != original_domain['name'] and
utils.is_not_url_safe(domain['name'])):
self._raise_reserved_character_exception('Domain', domain['name'])
if 'enabled' in domain: if 'enabled' in domain:
domain['enabled'] = clean.domain_enabled(domain['enabled']) domain['enabled'] = clean.domain_enabled(domain['enabled'])
ret = self.driver.update_domain(domain_id, domain) ret = self.driver.update_domain(domain_id, domain)

View File

@ -153,6 +153,18 @@ class UtilsTestCase(unit.BaseTestCase):
expected_json = '{"field":"value"}' expected_json = '{"field":"value"}'
self.assertEqual(expected_json, json) self.assertEqual(expected_json, json)
def test_url_safe_check(self):
base_str = 'i am safe'
self.assertFalse(common_utils.is_not_url_safe(base_str))
for i in common_utils.URL_RESERVED_CHARS:
self.assertTrue(common_utils.is_not_url_safe(base_str + i))
def test_url_safe_with_unicode_check(self):
base_str = u'i am \xe7afe'
self.assertFalse(common_utils.is_not_url_safe(base_str))
for i in common_utils.URL_RESERVED_CHARS:
self.assertTrue(common_utils.is_not_url_safe(base_str + i))
class ServiceHelperTests(unit.BaseTestCase): class ServiceHelperTests(unit.BaseTestCase):

View File

@ -62,6 +62,35 @@ class ResourceTestCase(test_v3.RestfulTestCase,
self.post('/domains', body={'domain': {}}, self.post('/domains', body={'domain': {}},
expected_status=http_client.BAD_REQUEST) expected_status=http_client.BAD_REQUEST)
def test_create_domain_unsafe(self):
"""Call ``POST /domains with unsafe names``."""
unsafe_name = 'i am not / safe'
self.config_fixture.config(group='resource',
domain_name_url_safe='off')
ref = unit.new_domain_ref(name=unsafe_name)
self.post(
'/domains',
body={'domain': ref})
self.config_fixture.config(group='resource',
domain_name_url_safe='new')
ref = unit.new_domain_ref(name=unsafe_name)
self.post(
'/domains',
body={'domain': ref},
expected_status=http_client.BAD_REQUEST)
def test_create_domain_unsafe_default(self):
"""Check default for unsafe names for``POST /domains ``."""
unsafe_name = 'i am not / safe'
# By default, we should be able to create unsafe names
ref = unit.new_domain_ref(name=unsafe_name)
self.post(
'/domains',
body={'domain': ref})
def test_list_domains(self): def test_list_domains(self):
"""Call ``GET /domains``.""" """Call ``GET /domains``."""
resource_url = '/domains' resource_url = '/domains'
@ -84,6 +113,39 @@ class ResourceTestCase(test_v3.RestfulTestCase,
body={'domain': ref}) body={'domain': ref})
self.assertValidDomainResponse(r, ref) self.assertValidDomainResponse(r, ref)
def test_update_domain_unsafe(self):
"""Call ``POST /domains/{domain_id} with unsafe names``."""
unsafe_name = 'i am not / safe'
self.config_fixture.config(group='resource',
domain_name_url_safe='off')
ref = unit.new_domain_ref(name=unsafe_name)
del ref['id']
self.patch('/domains/%(domain_id)s' % {
'domain_id': self.domain_id},
body={'domain': ref})
unsafe_name = 'i am still not / safe'
self.config_fixture.config(group='resource',
domain_name_url_safe='new')
ref = unit.new_domain_ref(name=unsafe_name)
del ref['id']
self.patch('/domains/%(domain_id)s' % {
'domain_id': self.domain_id},
body={'domain': ref},
expected_status=http_client.BAD_REQUEST)
def test_update_domain_unsafe_default(self):
"""Check default for unsafe names for``POST /domains ``."""
unsafe_name = 'i am not / safe'
# By default, we should be able to create unsafe names
ref = unit.new_domain_ref(name=unsafe_name)
del ref['id']
self.patch('/domains/%(domain_id)s' % {
'domain_id': self.domain_id},
body={'domain': ref})
def test_disable_domain(self): def test_disable_domain(self):
"""Call ``PATCH /domains/{domain_id}`` (set enabled=False).""" """Call ``PATCH /domains/{domain_id}`` (set enabled=False)."""
# Create a 2nd set of entities in a 2nd domain # Create a 2nd set of entities in a 2nd domain
@ -472,6 +534,35 @@ class ResourceTestCase(test_v3.RestfulTestCase,
self.post('/projects', body={'project': ref}, self.post('/projects', body={'project': ref},
expected_status=http_client.BAD_REQUEST) expected_status=http_client.BAD_REQUEST)
def test_create_project_unsafe(self):
"""Call ``POST /projects with unsafe names``."""
unsafe_name = 'i am not / safe'
self.config_fixture.config(group='resource',
project_name_url_safe='off')
ref = unit.new_project_ref(name=unsafe_name)
self.post(
'/projects',
body={'project': ref})
self.config_fixture.config(group='resource',
project_name_url_safe='new')
ref = unit.new_project_ref(name=unsafe_name)
self.post(
'/projects',
body={'project': ref},
expected_status=http_client.BAD_REQUEST)
def test_create_project_unsafe_default(self):
"""Check default for unsafe names for``POST /projects ``."""
unsafe_name = 'i am not / safe'
# By default, we should be able to create unsafe names
ref = unit.new_project_ref(name=unsafe_name)
self.post(
'/projects',
body={'project': ref})
def test_create_project_is_domain_not_allowed(self): def test_create_project_is_domain_not_allowed(self):
"""Call ``POST /projects``. """Call ``POST /projects``.
@ -950,6 +1041,45 @@ class ResourceTestCase(test_v3.RestfulTestCase,
body={'project': ref}) body={'project': ref})
self.assertValidProjectResponse(r, ref) self.assertValidProjectResponse(r, ref)
def test_update_project_unsafe(self):
"""Call ``POST /projects/{project_id} with unsafe names``."""
unsafe_name = 'i am not / safe'
self.config_fixture.config(group='resource',
project_name_url_safe='off')
ref = unit.new_project_ref(name=unsafe_name,
domain_id=self.domain_id)
del ref['id']
self.patch(
'/projects/%(project_id)s' % {
'project_id': self.project_id},
body={'project': ref})
unsafe_name = 'i am still not / safe'
self.config_fixture.config(group='resource',
project_name_url_safe='new')
ref = unit.new_project_ref(name=unsafe_name,
domain_id=self.domain_id)
del ref['id']
self.patch(
'/projects/%(project_id)s' % {
'project_id': self.project_id},
body={'project': ref},
expected_status=http_client.BAD_REQUEST)
def test_update_project_unsafe_default(self):
"""Check default for unsafe names for``POST /projects ``."""
unsafe_name = 'i am not / safe'
# By default, we should be able to create unsafe names
ref = unit.new_project_ref(name=unsafe_name,
domain_id=self.domain_id)
del ref['id']
self.patch(
'/projects/%(project_id)s' % {
'project_id': self.project_id},
body={'project': ref})
def test_update_project_domain_id(self): def test_update_project_domain_id(self):
"""Call ``PATCH /projects/{project_id}`` with domain_id.""" """Call ``PATCH /projects/{project_id}`` with domain_id."""
project = unit.new_project_ref(domain_id=self.domain['id']) project = unit.new_project_ref(domain_id=self.domain['id'])