Merge "Improve tests for api protection and filtering"

This commit is contained in:
Jenkins 2013-03-12 16:08:00 +00:00 committed by Gerrit Code Review
commit 45228caa73
4 changed files with 393 additions and 190 deletions

View File

@ -243,9 +243,28 @@ class V3Controller(V2Controller):
@classmethod
def filter_by_attribute(cls, context, refs, attr):
"""Filters a list of references by query string value."""
def _attr_match(ref_attr, val_attr):
"""Matches attributes allowing for booleans as strings.
We test explicitly for a value that defines it as 'False',
which also means that the existence of the attribute with
no value implies 'True'
"""
if type(ref_attr) is bool:
if (isinstance(val_attr, basestring) and
val_attr == '0'):
val = False
else:
val = True
return (ref_attr == val)
else:
return (ref_attr == val_attr)
if attr in context['query_string']:
value = context['query_string'][attr]
return [r for r in refs if r[attr] == value]
return [r for r in refs if _attr_match(r[attr], value)]
return refs
def _require_matching_id(self, value, ref):

View File

@ -20,9 +20,14 @@ TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
class RestfulTestCase(test_content_types.RestfulTestCase):
def setUp(self):
rules.reset()
def setUp(self, load_sample_data=True):
"""Setup for v3 Restful Test Cases.
If a child class wants to create their own sample data
and provide their own auth data to obtain tokens, then
load_sample_data should be set to false.
"""
self.config([
test.etcdir('keystone.conf.sample'),
test.testsdir('test_overrides.conf'),
@ -32,32 +37,33 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
sql_util.setup_test_database()
self.load_backends()
self.domain_id = uuid.uuid4().hex
self.domain = self.new_domain_ref()
self.domain['id'] = self.domain_id
self.identity_api.create_domain(self.domain_id, self.domain)
if load_sample_data:
self.domain_id = uuid.uuid4().hex
self.domain = self.new_domain_ref()
self.domain['id'] = self.domain_id
self.identity_api.create_domain(self.domain_id, self.domain)
self.project_id = uuid.uuid4().hex
self.project = self.new_project_ref(
domain_id=self.domain_id)
self.project['id'] = self.project_id
self.identity_api.create_project(self.project_id, self.project)
self.project_id = uuid.uuid4().hex
self.project = self.new_project_ref(
domain_id=self.domain_id)
self.project['id'] = self.project_id
self.identity_api.create_project(self.project_id, self.project)
self.user_id = uuid.uuid4().hex
self.user = self.new_user_ref(
domain_id=self.domain_id,
project_id=self.project_id)
self.user['id'] = self.user_id
self.identity_api.create_user(self.user_id, self.user)
self.user_id = uuid.uuid4().hex
self.user = self.new_user_ref(
domain_id=self.domain_id,
project_id=self.project_id)
self.user['id'] = self.user_id
self.identity_api.create_user(self.user_id, self.user)
# create & grant policy.json's default role for admin_required
self.role_id = uuid.uuid4().hex
self.role = self.new_role_ref()
self.role['id'] = self.role_id
self.role['name'] = 'admin'
self.identity_api.create_role(self.role_id, self.role)
self.identity_api.add_role_to_user_and_project(
self.user_id, self.project_id, self.role_id)
# create & grant policy.json's default role for admin_required
self.role_id = uuid.uuid4().hex
self.role = self.new_role_ref()
self.role['id'] = self.role_id
self.role['name'] = 'admin'
self.identity_api.create_role(self.role_id, self.role)
self.identity_api.add_role_to_user_and_project(
self.user_id, self.project_id, self.role_id)
self.public_server = self.serveapp('keystone', name='main')
self.admin_server = self.serveapp('keystone', name='admin')
@ -752,6 +758,76 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
return entity
def build_auth_scope(self, project_id=None, project_name=None,
project_domain_id=None, project_domain_name=None,
domain_id=None, domain_name=None, trust_id=None):
scope_data = {}
if project_id or project_name:
scope_data['project'] = {}
if project_id:
scope_data['project']['id'] = project_id
else:
scope_data['project']['name'] = project_name
if project_domain_id or project_domain_name:
project_domain_json = {}
if project_domain_id:
project_domain_json['id'] = project_domain_id
else:
project_domain_json['name'] = project_domain_name
scope_data['project']['domain'] = project_domain_json
if domain_id or domain_name:
scope_data['domain'] = {}
if domain_id:
scope_data['domain']['id'] = domain_id
else:
scope_data['domain']['name'] = domain_name
if trust_id:
scope_data['trust'] = {}
scope_data['trust']['id'] = trust_id
return scope_data
def build_password_auth(self, user_id=None, username=None,
user_domain_id=None, user_domain_name=None,
password=None):
password_data = {'user': {}}
if user_id:
password_data['user']['id'] = user_id
else:
password_data['user']['name'] = username
if user_domain_id or user_domain_name:
password_data['user']['domain'] = {}
if user_domain_id:
password_data['user']['domain']['id'] = user_domain_id
else:
password_data['user']['domain']['name'] = user_domain_name
password_data['user']['password'] = password
return password_data
def build_token_auth(self, token):
return {'id': token}
def build_authentication_request(self, token=None, user_id=None,
username=None, user_domain_id=None,
user_domain_name=None, password=None,
**kwargs):
"""Build auth dictionary.
It will create an auth dictionary based on all the arguments
that it receives.
"""
auth_data = {}
auth_data['identity'] = {'methods': []}
if token:
auth_data['identity']['methods'].append('token')
auth_data['identity']['token'] = self.build_token_auth(token)
if user_id or username:
auth_data['identity']['methods'].append('password')
auth_data['identity']['password'] = self.build_password_auth(
user_id, username, user_domain_id, user_domain_name, password)
if kwargs:
auth_data['scope'] = self.build_auth_scope(**kwargs)
return {'auth': auth_data}
class VersionTestCase(RestfulTestCase):
def test_get_version(self):

View File

@ -29,80 +29,15 @@ import test_v3
CONF = config.CONF
def _build_auth_scope(project_id=None, project_name=None,
project_domain_id=None, project_domain_name=None,
domain_id=None, domain_name=None, trust_id=None):
scope_data = {}
if project_id or project_name:
scope_data['project'] = {}
if project_id:
scope_data['project']['id'] = project_id
else:
scope_data['project']['name'] = project_name
if project_domain_id or project_domain_name:
project_domain_json = {}
if project_domain_id:
project_domain_json['id'] = project_domain_id
else:
project_domain_json['name'] = project_domain_name
scope_data['project']['domain'] = project_domain_json
if domain_id or domain_name:
scope_data['domain'] = {}
if domain_id:
scope_data['domain']['id'] = domain_id
else:
scope_data['domain']['name'] = domain_name
if trust_id:
scope_data['trust'] = {}
scope_data['trust']['id'] = trust_id
return scope_data
class TestAuthInfo(test_v3.RestfulTestCase):
# TDOD(henry-nash) These tests are somewhat inefficient, since by
# using the test_v3.RestfulTestCase class to gain access to the auth
# building helper functions, they cause backend databases and fixtures
# to be loaded unnecessarily. Separating out the helper functions from
# this base class would improve efficiency (Bug #1134836)
def setUp(self):
super(TestAuthInfo, self).setUp(load_sample_data=False)
def _build_password_auth(user_id=None, username=None,
user_domain_id=None, user_domain_name=None,
password=None):
password_data = {'user': {}}
if user_id:
password_data['user']['id'] = user_id
else:
password_data['user']['name'] = username
if user_domain_id or user_domain_name:
password_data['user']['domain'] = {}
if user_domain_id:
password_data['user']['domain']['id'] = user_domain_id
else:
password_data['user']['domain']['name'] = user_domain_name
password_data['user']['password'] = password
return password_data
def _build_token_auth(token):
return {'id': token}
def _build_authentication_request(token=None, user_id=None, username=None,
user_domain_id=None, user_domain_name=None,
password=None, **kwargs):
"""Build auth dictionary.
It will create an auth dictionary based on all the arguments
that it receives.
"""
auth_data = {}
auth_data['identity'] = {'methods': []}
if token:
auth_data['identity']['methods'].append('token')
auth_data['identity']['token'] = _build_token_auth(token)
if user_id or username:
auth_data['identity']['methods'].append('password')
auth_data['identity']['password'] = _build_password_auth(
user_id, username, user_domain_id, user_domain_name, password)
if kwargs:
auth_data['scope'] = _build_auth_scope(**kwargs)
return {'auth': auth_data}
class TestAuthInfo(test.TestCase):
def test_missing_auth_methods(self):
auth_data = {'identity': {}}
auth_data['identity']['token'] = {'id': uuid.uuid4().hex}
@ -129,19 +64,21 @@ class TestAuthInfo(test.TestCase):
auth_data)
def test_project_name_no_domain(self):
auth_data = _build_authentication_request(username='test',
password='test',
project_name='abc')['auth']
auth_data = self.build_authentication_request(
username='test',
password='test',
project_name='abc')['auth']
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo,
None,
auth_data)
def test_both_project_and_domain_in_scope(self):
auth_data = _build_authentication_request(user_id='test',
password='test',
project_name='test',
domain_name='test')['auth']
auth_data = self.build_authentication_request(
user_id='test',
password='test',
project_name='test',
domain_name='test')['auth']
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo,
None,
@ -151,7 +88,7 @@ class TestAuthInfo(test.TestCase):
class TestTokenAPIs(test_v3.RestfulTestCase):
def setUp(self):
super(TestTokenAPIs, self).setUp()
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain_id,
password=self.user['password'])
@ -165,7 +102,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
def test_v3_pki_token_id(self):
self.opt_in_group('signing', token_format='PKI')
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
@ -181,7 +118,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
# FIXME(gyee): PKI tokens are not interchangeable because token
# data is baked into the token itself.
self.opt_in_group('signing', token_format='UUID')
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
@ -208,7 +145,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
# FIXME(gyee): PKI tokens are not interchangeable because token
# data is baked into the token itself.
self.opt_in_group('signing', token_format='PKI')
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
@ -287,7 +224,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
def test_rescoping_token(self):
expires = self.token_data['token']['expires_at']
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
token=self.token,
project_id=self.project_id)
r = self.post('/auth/tokens', body=auth_data)
@ -316,14 +253,14 @@ class TestAuthJSON(test_v3.RestfulTestCase):
content_type = 'json'
def test_unscoped_token_with_user_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_unscoped_token_with_user_domain_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'])
@ -331,7 +268,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.assertValidUnscopedTokenResponse(r)
def test_unscoped_token_with_user_domain_name(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'])
@ -339,7 +276,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.assertValidUnscopedTokenResponse(r)
def test_project_id_scoped_token_with_user_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
@ -351,14 +288,14 @@ class TestAuthJSON(test_v3.RestfulTestCase):
project = self.new_project_ref(domain_id=self.domain_id)
self.identity_api.create_project(project_id, project)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=project['id'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_project_id_scoped_token_with_user_domain_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
@ -367,7 +304,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.assertValidProjectScopedTokenResponse(r)
def test_project_id_scoped_token_with_user_domain_name(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
@ -380,7 +317,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
@ -392,7 +329,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
@ -405,7 +342,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
@ -418,7 +355,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_name=self.domain['name'])
@ -430,7 +367,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
@ -443,7 +380,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
@ -467,7 +404,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.put(path=path)
# now get a domain-scoped token
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
@ -480,7 +417,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
# now get a domain-scoped token
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_name=self.domain['name'])
@ -488,14 +425,14 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.assertValidDomainScopedTokenResponse(r)
def test_domain_scope_failed(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_auth_with_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.post('/auth/tokens', body=auth_data)
@ -505,45 +442,45 @@ class TestAuthJSON(test_v3.RestfulTestCase):
headers = {'X-Subject-Token': r.getheader('X-Subject-Token')}
# test token auth
auth_data = _build_authentication_request(token=token)
auth_data = self.build_authentication_request(token=token)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_invalid_user_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=uuid.uuid4().hex,
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_user_name(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=uuid.uuid4().hex,
user_domain_id=self.domain['id'],
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_domain_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=uuid.uuid4().hex,
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_domain_name(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=uuid.uuid4().hex,
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_password(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=uuid.uuid4().hex)
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_remote_user(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])['auth']
api = auth.controllers.Auth()
@ -554,7 +491,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.assertEqual(auth_context['user_id'], self.user['id'])
def test_remote_user_no_domain(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
password=self.user['password'])['auth']
api = auth.controllers.Auth()
@ -701,7 +638,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
'trust_id': trust['id']},
expected_status=404)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
@ -720,7 +657,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
r = self.post('/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
@ -751,7 +688,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
r = self.post('/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
@ -793,7 +730,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
'trust_id': trust['id']},
expected_status=404)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])

View File

@ -22,54 +22,87 @@ import uuid
import nose.exc
from keystone import config
from keystone import exception
from keystone.openstack.common import jsonutils
from keystone.policy.backends import rules
import test_v3
CONF = config.CONF
DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
class IdentityTestProtectedCase(test_v3.RestfulTestCase):
"""Test policy protection of a sample of v3 identity apis"""
def setUp(self):
super(IdentityTestProtectedCase, self).setUp()
"""Setup for Identity Protection Test Cases.
As well as the usual housekeeping, create a set of domains,
users, roles and projects for the subsequent tests:
- Three domains: A,B & C. C is disabled.
- DomainA has user1, DomainB has user2 and user3
- DomainA has group1 and group2, DomainB has group3
- User1 has a role on DomainA
Remember that there will also be a fourth domain in existence,
the default domain.
"""
# Ensure that test_v3.RestfulTestCase doesn't load its own
# sample data, which would make checking the results of our
# tests harder
super(IdentityTestProtectedCase, self).setUp(load_sample_data=False)
# Start by creating a couple of domains
self.domainA = self.new_domain_ref()
domainA_ref = self.identity_api.create_domain(self.domainA['id'],
self.domainA)
self.domainB = self.new_domain_ref()
domainB_ref = self.identity_api.create_domain(self.domainB['id'],
self.domainB)
self.domainC = self.new_domain_ref()
self.domainC['enabled'] = False
domainC_ref = self.identity_api.create_domain(self.domainC['id'],
self.domainC)
# Now create some users, one in domainA and two of them in domainB
self.user1 = self.new_user_ref(
domain_id=self.domainA['id'])
self.user1['password'] = uuid.uuid4().hex
user_ref = self.identity_api.create_user(self.user1['id'], self.user1)
user_ref = self.identity_api.create_user(self.user1['id'],
self.user1)
self.user2 = self.new_user_ref(
domain_id=self.domainB['id'])
self.user2['password'] = uuid.uuid4().hex
user_ref = self.identity_api.create_user(self.user2['id'], self.user2)
user_ref = self.identity_api.create_user(self.user2['id'],
self.user2)
self.user3 = self.new_user_ref(
domain_id=self.domainB['id'])
self.user3['password'] = uuid.uuid4().hex
user_ref = self.identity_api.create_user(self.user3['id'], self.user3)
user_ref = self.identity_api.create_user(self.user3['id'],
self.user3)
self.project = self.new_project_ref(
self.group1 = self.new_group_ref(
domain_id=self.domainA['id'])
project_ref = self.identity_api.create_project(self.project['id'],
self.project)
user_ref = self.identity_api.create_group(self.group1['id'],
self.group1)
self.group2 = self.new_group_ref(
domain_id=self.domainA['id'])
user_ref = self.identity_api.create_group(self.group2['id'],
self.group2)
self.group3 = self.new_group_ref(
domain_id=self.domainB['id'])
user_ref = self.identity_api.create_group(self.group3['id'],
self.group3)
self.role = self.new_role_ref()
self.identity_api.create_role(self.role['id'], self.role)
self.identity_api.add_role_to_user_and_project(self.user1['id'],
self.project['id'],
self.role['id'])
self.identity_api.create_grant(self.role['id'],
user_id=self.user1['id'],
domain_id=self.domainA['id'])
@ -82,15 +115,9 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self.opt(policy_file=self.tmpfilename)
# A default auth request we can use - un-scoped user token
self.auth = {}
self.auth['identity'] = {'methods': []}
self.auth['identity']['methods'].append('password')
self.auth['identity']['password'] = {'user': {}}
self.auth['identity']['password']['user']['id'] = (
self.user1['id'])
self.auth['identity']['password']['user']['password'] = (
self.user1['password'])
self.auth = {'auth': self.auth}
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'])
def tearDown(self):
super(IdentityTestProtectedCase, self).tearDown()
@ -103,13 +130,20 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
result_list.append(x['id'])
return result_list
def test_list_users_unprotected(self):
"""GET /users (unprotected)"""
# Make sure we get all the users back if no protection
# or filtering
def _set_policy(self, new_policy):
with open(self.tmpfilename, "w") as policyfile:
policyfile.write("""{"identity:list_users": []}""")
policyfile.write(jsonutils.dumps(new_policy))
def test_list_users_unprotected(self):
"""GET /users (unprotected)
Test Plan:
- Update policy so api is unprotected
- Use an un-scoped token to make sure we can get back all
the users independent of domain
"""
self._set_policy({"identity:list_users": []})
r = self.get('/users', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.body.get('users'))
self.assertIn(self.user1['id'], id_list)
@ -117,11 +151,15 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self.assertIn(self.user3['id'], id_list)
def test_list_users_filtered_by_domain(self):
"""GET /users?domain_id=mydomain """
"""GET /users?domain_id=mydomain (filtered)
# Using no protection, make sure filtering works
with open(self.tmpfilename, "w") as policyfile:
policyfile.write("""{"identity:list_users": []}""")
Test Plan:
- Update policy so api is unprotected
- Use an un-scoped token to make sure we can filter the
users by domainB, getting back the 2 users in that domain
"""
self._set_policy({"identity:list_users": []})
url_by_name = '/users?domain_id=%s' % self.domainB['id']
r = self.get(url_by_name, auth=self.auth)
# We should get back two users, those in DomainB
@ -129,28 +167,161 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self.assertIn(self.user2['id'], id_list)
self.assertIn(self.user3['id'], id_list)
def test_list_users_protected_by_domain(self):
"""GET /users?domain_id=mydomain (protected)"""
# Update policy to protect by domain, and then use a domain
# scoped token
new_policy = """{"identity:list_users": ["domain_id:%(domain_id)s"]}"""
with open(self.tmpfilename, "w") as policyfile:
policyfile.write(new_policy)
self.auth['auth']['scope'] = {'domain': {'id': self.domainA['id']}}
url_by_name = '/users?domain_id=%s' % self.user1['domain_id']
r = self.get(url_by_name, auth=self.auth)
# We should only get back one user, the one in DomainA
id_list = self._get_id_list_from_ref_list(r.body.get('users'))
self.assertIn(self.user1['id'], id_list)
def test_get_user_protected_match_id(self):
"""GET /users/{id} (match payload)"""
# Tests the flattening of the payload
policy = {"identity:get_user": [["user_id:%(user_id)s"]]}
with open(self.tmpfilename, "w") as policyfile:
policyfile.write(json.dumps(policy))
"""GET /users/{id} (match payload)
Test Plan:
- Update policy to protect api by user_id
- List users with user_id of user1 as filter, to check that
this will correctly match user_id in the flattened
payload
"""
# TODO (henry-nash, ayoung): It would be good to expand this
# test for further test flattening, e.g. protect on, say, an
# attribute of an object being created
new_policy = {"identity:get_user": [["user_id:%(user_id)s"]]}
self._set_policy(new_policy)
url_by_name = '/users/%s' % self.user1['id']
r = self.get(url_by_name, auth=self.auth)
body = r.body
self.assertEquals(self.user1['id'], body['user']['id'])
def test_list_users_protected_by_domain(self):
"""GET /users?domain_id=mydomain (protected)
Test Plan:
- Update policy to protect api by domain_id
- List groups using a token scoped to domainA with a filter
specifying domainA - we should only get back the one user
that is in domainA.
- Try and read the users from domainB - this should fail since
we don't have a token scoped for domainB
"""
new_policy = {"identity:list_users": ["domain_id:%(domain_id)s"]}
self._set_policy(new_policy)
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'],
domain_id=self.domainA['id'])
url_by_name = '/users?domain_id=%s' % self.domainA['id']
r = self.get(url_by_name, auth=self.auth)
# We should only get back one user, the one in DomainA
id_list = self._get_id_list_from_ref_list(r.body.get('users'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.user1['id'], id_list)
# Now try for domainB, which should fail
url_by_name = '/users?domain_id=%s' % self.domainB['id']
r = self.get(url_by_name, auth=self.auth,
expected_status=exception.ForbiddenAction.code)
def test_list_groups_protected_by_domain(self):
"""GET /groups?domain_id=mydomain (protected)
Test Plan:
- Update policy to protect api by domain_id
- List groups using a token scoped to domainA and make sure
we only get back the two groups that are in domainA
- Try and read the groups from domainB - this should fail since
we don't have a token scoped for domainB
"""
new_policy = {"identity:list_groups": ["domain_id:%(domain_id)s"]}
self._set_policy(new_policy)
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'],
domain_id=self.domainA['id'])
url_by_name = '/groups?domain_id=%s' % self.domainA['id']
r = self.get(url_by_name, auth=self.auth)
# We should only get back two groups, the ones in DomainA
id_list = self._get_id_list_from_ref_list(r.body.get('groups'))
self.assertEqual(len(id_list), 2)
self.assertIn(self.group1['id'], id_list)
self.assertIn(self.group2['id'], id_list)
# Now try for domainB, which should fail
url_by_name = '/groups?domain_id=%s' % self.domainB['id']
r = self.get(url_by_name, auth=self.auth,
expected_status=exception.ForbiddenAction.code)
def test_list_groups_protected_by_domain_and_filtered(self):
"""GET /groups?domain_id=mydomain&name=myname (protected)
Test Plan:
- Update policy to protect api by domain_id
- List groups using a token scoped to domainA with a filter
specifying both domainA and the name of group.
- We should only get back the group in domainA that matches
the name
"""
new_policy = {"identity:list_groups": ["domain_id:%(domain_id)s"]}
self._set_policy(new_policy)
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'],
domain_id=self.domainA['id'])
url_by_name = '/groups?domain_id=%s&name=%s' % (
self.domainA['id'], self.group2['name'])
r = self.get(url_by_name, auth=self.auth)
# We should only get back one user, the one in DomainA that matches
# the name supplied
id_list = self._get_id_list_from_ref_list(r.body.get('groups'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.group2['id'], id_list)
def test_list_filtered_domains(self):
"""GET /domains?enabled=0
Test Plan:
- Update policy for no protection on api
- Filter by the 'enabled' boolean to get disabled domains, which
should return just domainC
- Try the filter using different ways of specifying 'true'
to test that our handling of booleans in filter matching is
correct
"""
new_policy = {"identity:list_domains": []}
self._set_policy(new_policy)
r = self.get('/domains?enabled=0', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.domainC['id'], id_list)
# Now try a few ways of specifying 'true' when we should get back
# the other two domains, plus the default domain
r = self.get('/domains?enabled=1', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
self.assertEqual(len(id_list), 3)
self.assertIn(self.domainA['id'], id_list)
self.assertIn(self.domainB['id'], id_list)
self.assertIn(DEFAULT_DOMAIN_ID, id_list)
r = self.get('/domains?enabled', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
self.assertEqual(len(id_list), 3)
self.assertIn(self.domainA['id'], id_list)
self.assertIn(self.domainB['id'], id_list)
self.assertIn(DEFAULT_DOMAIN_ID, id_list)
def test_multiple_filters(self):
"""GET /domains?enabled&name=myname
Test Plan:
- Update policy for no protection on api
- Filter by the 'enabled' boolean and name - this should
return a single domain
"""
new_policy = {"identity:list_domains": []}
self._set_policy(new_policy)
my_url = '/domains?enableds&name=%s' % self.domainA['name']
r = self.get(my_url, auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.domainA['id'], id_list)