Validate domain for DB-based domain config. CRUD

For CRUD of Identity API-based domain-specific driver configuration
in database store, if the client provides an invalid domain id,
the request shall be rejected with a response 404 domain not found.

Change-Id: I9e861d450da1a49d31bac08bea12a0e2e84c6476
Closes-Bug: 1524562
This commit is contained in:
Thomas Hsiao 2015-12-11 15:55:19 -08:00 committed by guang-yee
parent f9e8a9bee6
commit 5560c7060d
2 changed files with 195 additions and 0 deletions

View File

@ -161,11 +161,13 @@ class DomainV3(controller.V3Controller):
@dependency.requires('domain_config_api') @dependency.requires('domain_config_api')
@dependency.requires('resource_api')
class DomainConfigV3(controller.V3Controller): class DomainConfigV3(controller.V3Controller):
member_name = 'config' member_name = 'config'
@controller.protected() @controller.protected()
def create_domain_config(self, context, domain_id, config): def create_domain_config(self, context, domain_id, config):
self.resource_api.get_domain(domain_id)
original_config = ( original_config = (
self.domain_config_api.get_config_with_sensitive_info(domain_id)) self.domain_config_api.get_config_with_sensitive_info(domain_id))
ref = self.domain_config_api.create_config(domain_id, config) ref = self.domain_config_api.create_config(domain_id, config)
@ -178,27 +180,32 @@ class DomainConfigV3(controller.V3Controller):
@controller.protected() @controller.protected()
def get_domain_config(self, context, domain_id, group=None, option=None): def get_domain_config(self, context, domain_id, group=None, option=None):
self.resource_api.get_domain(domain_id)
ref = self.domain_config_api.get_config(domain_id, group, option) ref = self.domain_config_api.get_config(domain_id, group, option)
return {self.member_name: ref} return {self.member_name: ref}
@controller.protected() @controller.protected()
def update_domain_config( def update_domain_config(
self, context, domain_id, config, group, option): self, context, domain_id, config, group, option):
self.resource_api.get_domain(domain_id)
ref = self.domain_config_api.update_config( ref = self.domain_config_api.update_config(
domain_id, config, group, option) domain_id, config, group, option)
return wsgi.render_response(body={self.member_name: ref}) return wsgi.render_response(body={self.member_name: ref})
def update_domain_config_group(self, context, domain_id, group, config): def update_domain_config_group(self, context, domain_id, group, config):
self.resource_api.get_domain(domain_id)
return self.update_domain_config( return self.update_domain_config(
context, domain_id, config, group, option=None) context, domain_id, config, group, option=None)
def update_domain_config_only(self, context, domain_id, config): def update_domain_config_only(self, context, domain_id, config):
self.resource_api.get_domain(domain_id)
return self.update_domain_config( return self.update_domain_config(
context, domain_id, config, group=None, option=None) context, domain_id, config, group=None, option=None)
@controller.protected() @controller.protected()
def delete_domain_config( def delete_domain_config(
self, context, domain_id, group=None, option=None): self, context, domain_id, group=None, option=None):
self.resource_api.get_domain(domain_id)
self.domain_config_api.delete_config(domain_id, group, option) self.domain_config_api.delete_config(domain_id, group, option)

View File

@ -46,6 +46,19 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
self.assertEqual(self.config, r.result['config']) self.assertEqual(self.config, r.result['config'])
self.assertEqual(self.config, res) self.assertEqual(self.config, res)
def test_create_config_invalid_domain(self):
"""Call ``PUT /domains/{domain_id}/config``
While creating Identity API-based domain config with an invalid domain
id provided, the request shall be rejected with a response, 404 domain
not found.
"""
invalid_domain_id = uuid.uuid4().hex
url = '/domains/%(domain_id)s/config' % {
'domain_id': invalid_domain_id}
self.put(url, body={'config': self.config},
expected_status=exception.DomainNotFound.code)
def test_create_config_twice(self): def test_create_config_twice(self):
"""Check multiple creates don't throw error""" """Check multiple creates don't throw error"""
self.put('/domains/%(domain_id)s/config' % { self.put('/domains/%(domain_id)s/config' % {
@ -66,6 +79,19 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
'domain_id': self.domain['id']}, 'domain_id': self.domain['id']},
expected_status=exception.DomainConfigNotFound.code) expected_status=exception.DomainConfigNotFound.code)
def test_delete_config_invalid_domain(self):
"""Call ``DELETE /domains{domain_id}/config``
While deleting Identity API-based domain config with an invalid domain
id provided, the request shall be rejected with a response, 404 domain
not found.
"""
self.domain_config_api.create_config(self.domain['id'], self.config)
invalid_domain_id = uuid.uuid4().hex
self.delete('/domains/%(domain_id)s/config' % {
'domain_id': invalid_domain_id},
expected_status=exception.DomainNotFound.code)
def test_delete_config_by_group(self): def test_delete_config_by_group(self):
"""Call ``DELETE /domains{domain_id}/config/{group}``.""" """Call ``DELETE /domains{domain_id}/config/{group}``."""
self.domain_config_api.create_config(self.domain['id'], self.config) self.domain_config_api.create_config(self.domain['id'], self.config)
@ -74,6 +100,19 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
res = self.domain_config_api.get_config(self.domain['id']) res = self.domain_config_api.get_config(self.domain['id'])
self.assertNotIn('ldap', res) self.assertNotIn('ldap', res)
def test_delete_config_by_group_invalid_domain(self):
"""Call ``DELETE /domains{domain_id}/config/{group}``
While deleting Identity API-based domain config by group with an
invalid domain id provided, the request shall be rejected with a
response 404 domain not found.
"""
self.domain_config_api.create_config(self.domain['id'], self.config)
invalid_domain_id = uuid.uuid4().hex
self.delete('/domains/%(domain_id)s/config/ldap' % {
'domain_id': invalid_domain_id},
expected_status=exception.DomainNotFound.code)
def test_get_head_config(self): def test_get_head_config(self):
"""Call ``GET & HEAD for /domains{domain_id}/config``.""" """Call ``GET & HEAD for /domains{domain_id}/config``."""
self.domain_config_api.create_config(self.domain['id'], self.config) self.domain_config_api.create_config(self.domain['id'], self.config)
@ -92,6 +131,19 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
self.assertEqual({'ldap': self.config['ldap']}, r.result['config']) self.assertEqual({'ldap': self.config['ldap']}, r.result['config'])
self.head(url, expected_status=http_client.OK) self.head(url, expected_status=http_client.OK)
def test_get_config_by_group_invalid_domain(self):
"""Call ``GET & HEAD /domains{domain_id}/config/{group}``
While retrieving Identity API-based domain config by group with an
invalid domain id provided, the request shall be rejected with a
response 404 domain not found.
"""
self.domain_config_api.create_config(self.domain['id'], self.config)
invalid_domain_id = uuid.uuid4().hex
self.get('/domains/%(domain_id)s/config/ldap' % {
'domain_id': invalid_domain_id},
expected_status=exception.DomainNotFound.code)
def test_get_config_by_option(self): def test_get_config_by_option(self):
"""Call ``GET & HEAD /domains{domain_id}/config/{group}/{option}``.""" """Call ``GET & HEAD /domains{domain_id}/config/{group}/{option}``."""
self.domain_config_api.create_config(self.domain['id'], self.config) self.domain_config_api.create_config(self.domain['id'], self.config)
@ -102,12 +154,37 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
r.result['config']) r.result['config'])
self.head(url, expected_status=http_client.OK) self.head(url, expected_status=http_client.OK)
def test_get_config_by_option_invalid_domain(self):
"""Call ``GET & HEAD /domains{domain_id}/config/{group}/{option}``
While retrieving Identity API-based domain config by option with an
invalid domain id provided, the request shall be rejected with a
response 404 domain not found.
"""
self.domain_config_api.create_config(self.domain['id'], self.config)
invalid_domain_id = uuid.uuid4().hex
self.get('/domains/%(domain_id)s/config/ldap/url' % {
'domain_id': invalid_domain_id},
expected_status=exception.DomainNotFound.code)
def test_get_non_existant_config(self): def test_get_non_existant_config(self):
"""Call ``GET /domains{domain_id}/config when no config defined``.""" """Call ``GET /domains{domain_id}/config when no config defined``."""
self.get('/domains/%(domain_id)s/config' % { self.get('/domains/%(domain_id)s/config' % {
'domain_id': self.domain['id']}, 'domain_id': self.domain['id']},
expected_status=http_client.NOT_FOUND) expected_status=http_client.NOT_FOUND)
def test_get_non_existant_config_invalid_domain(self):
"""Call ``GET /domains{domain_id}/config when no config defined``
While retrieving non-existent Identity API-based domain config with an
invalid domain id provided, the request shall be rejected with a
response 404 domain not found.
"""
invalid_domain_id = uuid.uuid4().hex
self.get('/domains/%(domain_id)s/config' % {
'domain_id': invalid_domain_id},
expected_status=exception.DomainNotFound.code)
def test_get_non_existant_config_group(self): def test_get_non_existant_config_group(self):
"""Call ``GET /domains{domain_id}/config/{group_not_exist}``.""" """Call ``GET /domains{domain_id}/config/{group_not_exist}``."""
config = {'ldap': {'url': uuid.uuid4().hex}} config = {'ldap': {'url': uuid.uuid4().hex}}
@ -116,6 +193,20 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
'domain_id': self.domain['id']}, 'domain_id': self.domain['id']},
expected_status=http_client.NOT_FOUND) expected_status=http_client.NOT_FOUND)
def test_get_non_existant_config_group_invalid_domain(self):
"""Call ``GET /domains{domain_id}/config/{group_not_exist}``
While retrieving non-existent Identity API-based domain config group
with an invalid domain id provided, the request shall be rejected with
a response, 404 domain not found.
"""
config = {'ldap': {'url': uuid.uuid4().hex}}
self.domain_config_api.create_config(self.domain['id'], config)
invalid_domain_id = uuid.uuid4().hex
self.get('/domains/%(domain_id)s/config/identity' % {
'domain_id': invalid_domain_id},
expected_status=exception.DomainNotFound.code)
def test_get_non_existant_config_option(self): def test_get_non_existant_config_option(self):
"""Call ``GET /domains{domain_id}/config/group/{option_not_exist}``.""" """Call ``GET /domains{domain_id}/config/group/{option_not_exist}``."""
config = {'ldap': {'url': uuid.uuid4().hex}} config = {'ldap': {'url': uuid.uuid4().hex}}
@ -124,6 +215,20 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
'domain_id': self.domain['id']}, 'domain_id': self.domain['id']},
expected_status=http_client.NOT_FOUND) expected_status=http_client.NOT_FOUND)
def test_get_non_existant_config_option_invalid_domain(self):
"""Call ``GET /domains{domain_id}/config/group/{option_not_exist}``
While retrieving non-existent Identity API-based domain config option
with an invalid domain id provided, the request shall be rejected with
a response, 404 domain not found.
"""
config = {'ldap': {'url': uuid.uuid4().hex}}
self.domain_config_api.create_config(self.domain['id'], config)
invalid_domain_id = uuid.uuid4().hex
self.get('/domains/%(domain_id)s/config/ldap/user_tree_dn' % {
'domain_id': invalid_domain_id},
expected_status=exception.DomainNotFound.code)
def test_update_config(self): def test_update_config(self):
"""Call ``PATCH /domains/{domain_id}/config``.""" """Call ``PATCH /domains/{domain_id}/config``."""
self.domain_config_api.create_config(self.domain['id'], self.config) self.domain_config_api.create_config(self.domain['id'], self.config)
@ -140,6 +245,22 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
self.assertEqual(expected_config, r.result['config']) self.assertEqual(expected_config, r.result['config'])
self.assertEqual(expected_config, res) self.assertEqual(expected_config, res)
def test_update_config_invalid_domain(self):
"""Call ``PATCH /domains/{domain_id}/config``
While updating Identity API-based domain config with an invalid domain
id provided, the request shall be rejected with a response, 404 domain
not found.
"""
self.domain_config_api.create_config(self.domain['id'], self.config)
new_config = {'ldap': {'url': uuid.uuid4().hex},
'identity': {'driver': uuid.uuid4().hex}}
invalid_domain_id = uuid.uuid4().hex
self.patch('/domains/%(domain_id)s/config' % {
'domain_id': invalid_domain_id},
body={'config': new_config},
expected_status=exception.DomainNotFound.code)
def test_update_config_group(self): def test_update_config_group(self):
"""Call ``PATCH /domains/{domain_id}/config/{group}``.""" """Call ``PATCH /domains/{domain_id}/config/{group}``."""
self.domain_config_api.create_config(self.domain['id'], self.config) self.domain_config_api.create_config(self.domain['id'], self.config)
@ -156,6 +277,22 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
self.assertEqual(expected_config, r.result['config']) self.assertEqual(expected_config, r.result['config'])
self.assertEqual(expected_config, res) self.assertEqual(expected_config, res)
def test_update_config_group_invalid_domain(self):
"""Call ``PATCH /domains/{domain_id}/config/{group}``
While updating Identity API-based domain config group with an invalid
domain id provided, the request shall be rejected with a response,
404 domain not found.
"""
self.domain_config_api.create_config(self.domain['id'], self.config)
new_config = {'ldap': {'url': uuid.uuid4().hex,
'user_filter': uuid.uuid4().hex}}
invalid_domain_id = uuid.uuid4().hex
self.patch('/domains/%(domain_id)s/config/ldap' % {
'domain_id': invalid_domain_id},
body={'config': new_config},
expected_status=exception.DomainNotFound.code)
def test_update_config_invalid_group(self): def test_update_config_invalid_group(self):
"""Call ``PATCH /domains/{domain_id}/config/{invalid_group}``.""" """Call ``PATCH /domains/{domain_id}/config/{invalid_group}``."""
self.domain_config_api.create_config(self.domain['id'], self.config) self.domain_config_api.create_config(self.domain['id'], self.config)
@ -179,6 +316,24 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
body={'config': new_config}, body={'config': new_config},
expected_status=http_client.NOT_FOUND) expected_status=http_client.NOT_FOUND)
def test_update_config_invalid_group_invalid_domain(self):
"""Call ``PATCH /domains/{domain_id}/config/{invalid_group}``
While updating Identity API-based domain config with an invalid group
and an invalid domain id provided, the request shall be rejected
with a response, 404 domain not found.
"""
self.domain_config_api.create_config(self.domain['id'], self.config)
invalid_group = uuid.uuid4().hex
new_config = {invalid_group: {'url': uuid.uuid4().hex,
'user_filter': uuid.uuid4().hex}}
invalid_domain_id = uuid.uuid4().hex
self.patch('/domains/%(domain_id)s/config/%(invalid_group)s' % {
'domain_id': invalid_domain_id,
'invalid_group': invalid_group},
body={'config': new_config},
expected_status=exception.DomainNotFound.code)
def test_update_config_option(self): def test_update_config_option(self):
"""Call ``PATCH /domains/{domain_id}/config/{group}/{option}``.""" """Call ``PATCH /domains/{domain_id}/config/{group}/{option}``."""
self.domain_config_api.create_config(self.domain['id'], self.config) self.domain_config_api.create_config(self.domain['id'], self.config)
@ -192,6 +347,21 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
self.assertEqual(expected_config, r.result['config']) self.assertEqual(expected_config, r.result['config'])
self.assertEqual(expected_config, res) self.assertEqual(expected_config, res)
def test_update_config_option_invalid_domain(self):
"""Call ``PATCH /domains/{domain_id}/config/{group}/{option}``
While updating Identity API-based domain config option with an invalid
domain id provided, the request shall be rejected with a response, 404
domain not found.
"""
self.domain_config_api.create_config(self.domain['id'], self.config)
new_config = {'url': uuid.uuid4().hex}
invalid_domain_id = uuid.uuid4().hex
self.patch('/domains/%(domain_id)s/config/ldap/url' % {
'domain_id': invalid_domain_id},
body={'config': new_config},
expected_status=exception.DomainNotFound.code)
def test_update_config_invalid_option(self): def test_update_config_invalid_option(self):
"""Call ``PATCH /domains/{domain_id}/config/{group}/{invalid}``.""" """Call ``PATCH /domains/{domain_id}/config/{group}/{invalid}``."""
self.domain_config_api.create_config(self.domain['id'], self.config) self.domain_config_api.create_config(self.domain['id'], self.config)
@ -213,3 +383,21 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
'domain_id': self.domain['id']}, 'domain_id': self.domain['id']},
body={'config': new_config}, body={'config': new_config},
expected_status=http_client.NOT_FOUND) expected_status=http_client.NOT_FOUND)
def test_update_config_invalid_option_invalid_domain(self):
"""Call ``PATCH /domains/{domain_id}/config/{group}/{invalid}``
While updating Identity API-based domain config with an invalid option
and an invalid domain id provided, the request shall be rejected
with a response, 404 domain not found.
"""
self.domain_config_api.create_config(self.domain['id'], self.config)
invalid_option = uuid.uuid4().hex
new_config = {'ldap': {invalid_option: uuid.uuid4().hex}}
invalid_domain_id = uuid.uuid4().hex
self.patch(
'/domains/%(domain_id)s/config/ldap/%(invalid_option)s' % {
'domain_id': invalid_domain_id,
'invalid_option': invalid_option},
body={'config': new_config},
expected_status=exception.DomainNotFound.code)