Create authentication specific routes
These routes are purely based on your current authentication and bridge the gap between what is available in the standard identity-api for fetching scope targets based on user_id and what is required for the federation APIs. Implement /auth/projects /auth/domains and move /catalog to /auth/catalog Change-Id: I464c0ca5cc9f250d593340e9563de45b077dd4cd Implements: blueprint auth-specific-data
This commit is contained in:
parent
a8b844251b
commit
eb25fc6424
@ -25,8 +25,6 @@
|
||||
"identity:update_endpoint": "rule:admin_required",
|
||||
"identity:delete_endpoint": "rule:admin_required",
|
||||
|
||||
"identity:get_catalog": "",
|
||||
|
||||
"identity:get_domain": "rule:admin_required",
|
||||
"identity:list_domains": "rule:admin_required",
|
||||
"identity:create_domain": "rule:admin_required",
|
||||
@ -139,6 +137,10 @@
|
||||
"identity:delete_mapping": "rule:admin_required",
|
||||
"identity:update_mapping": "rule:admin_required",
|
||||
|
||||
"identity:get_auth_catalog": "",
|
||||
"identity:get_auth_projects": "",
|
||||
"identity:get_auth_domains": "",
|
||||
|
||||
"identity:list_projects_for_groups": "",
|
||||
"identity:list_domains_for_groups": "",
|
||||
|
||||
|
@ -28,8 +28,6 @@
|
||||
"identity:update_endpoint": "rule:cloud_admin",
|
||||
"identity:delete_endpoint": "rule:cloud_admin",
|
||||
|
||||
"identity:get_catalog": "",
|
||||
|
||||
"identity:get_domain": "rule:cloud_admin",
|
||||
"identity:list_domains": "rule:cloud_admin",
|
||||
"identity:create_domain": "rule:cloud_admin",
|
||||
@ -152,6 +150,10 @@
|
||||
"identity:delete_mapping": "rule:admin_required",
|
||||
"identity:update_mapping": "rule:admin_required",
|
||||
|
||||
"identity:get_auth_catalog": "",
|
||||
"identity:get_auth_projects": "",
|
||||
"identity:get_auth_domains": "",
|
||||
|
||||
"identity:list_projects_for_groups": "",
|
||||
"identity:list_domains_for_groups": "",
|
||||
|
||||
|
@ -205,6 +205,9 @@ class Assignment(kvs.Base, assignment.Driver):
|
||||
|
||||
return project_refs
|
||||
|
||||
def list_domains_for_user(self, user_id, group_ids, hints):
|
||||
raise exception.NotImplemented()
|
||||
|
||||
def get_roles_for_groups(self, group_ids, project_id=None, domain_id=None):
|
||||
raise exception.NotImplemented()
|
||||
|
||||
|
@ -162,6 +162,9 @@ class Assignment(assignment.Driver):
|
||||
def list_projects_for_groups(self, group_ids):
|
||||
raise exception.NotImplemented()
|
||||
|
||||
def list_domains_for_user(self, user_id, group_ids, hints):
|
||||
raise exception.NotImplemented()
|
||||
|
||||
def list_domains_for_groups(self, group_ids):
|
||||
raise exception.NotImplemented()
|
||||
|
||||
|
@ -289,6 +289,28 @@ class Assignment(keystone_assignment.Driver):
|
||||
|
||||
return _project_ids_to_dicts(session, project_ids)
|
||||
|
||||
def list_domains_for_user(self, user_id, group_ids, hints):
|
||||
with sql.transaction() as session:
|
||||
query = session.query(Domain)
|
||||
query = query.join(RoleAssignment,
|
||||
Domain.id == RoleAssignment.target_id)
|
||||
filters = []
|
||||
|
||||
if user_id:
|
||||
filters.append(sqlalchemy.and_(
|
||||
RoleAssignment.actor_id == user_id,
|
||||
RoleAssignment.type == AssignmentType.USER_DOMAIN))
|
||||
if group_ids:
|
||||
filters.append(sqlalchemy.and_(
|
||||
RoleAssignment.actor_id.in_(group_ids),
|
||||
RoleAssignment.type == AssignmentType.GROUP_DOMAIN))
|
||||
|
||||
if not filters:
|
||||
return []
|
||||
|
||||
query = query.filter(sqlalchemy.or_(*filters))
|
||||
return [ref.to_dict() for ref in query.all()]
|
||||
|
||||
def get_roles_for_groups(self, group_ids, project_id=None, domain_id=None):
|
||||
|
||||
if project_id is not None:
|
||||
|
@ -70,6 +70,12 @@ class Manager(manager.Manager):
|
||||
|
||||
super(Manager, self).__init__(assignment_driver)
|
||||
|
||||
def _get_group_ids_for_user_id(self, user_id):
|
||||
# TODO(morganfainberg): Implement a way to get only group_ids
|
||||
# instead of the more expensive to_dict() call for each record.
|
||||
return [x['id'] for
|
||||
x in self.identity_api.list_groups_for_user(user_id)]
|
||||
|
||||
@notifications.created(_PROJECT)
|
||||
def create_project(self, tenant_id, tenant):
|
||||
tenant = tenant.copy()
|
||||
@ -151,10 +157,7 @@ class Manager(manager.Manager):
|
||||
|
||||
"""
|
||||
def _get_group_project_roles(user_id, project_ref):
|
||||
# TODO(morganfainberg): Implement a way to get only group_ids
|
||||
# instead of the more expensive to_dict() call for each record.
|
||||
group_ids = [group['id'] for group in
|
||||
self.identity_api.list_groups_for_user(user_id)]
|
||||
group_ids = self._get_group_ids_for_user_id(user_id)
|
||||
return self.driver.get_group_project_roles(
|
||||
group_ids,
|
||||
project_ref['id'],
|
||||
@ -199,10 +202,10 @@ class Manager(manager.Manager):
|
||||
|
||||
def _get_group_domain_roles(user_id, domain_id):
|
||||
role_list = []
|
||||
group_refs = self.identity_api.list_groups_for_user(user_id)
|
||||
for x in group_refs:
|
||||
group_ids = self._get_group_ids_for_user_id(user_id)
|
||||
for group_id in group_ids:
|
||||
try:
|
||||
metadata_ref = self._get_metadata(group_id=x['id'],
|
||||
metadata_ref = self._get_metadata(group_id=group_id,
|
||||
domain_id=domain_id)
|
||||
role_list += self._roles_from_role_dicts(
|
||||
metadata_ref.get('roles', {}), False)
|
||||
@ -289,9 +292,7 @@ class Manager(manager.Manager):
|
||||
# list here and pass it in. The rest of the detailed logic of listing
|
||||
# projects for a user is pushed down into the driver to enable
|
||||
# optimization with the various backend technologies (SQL, LDAP etc.).
|
||||
|
||||
group_ids = [x['id'] for
|
||||
x in self.identity_api.list_groups_for_user(user_id)]
|
||||
group_ids = self._get_group_ids_for_user_id(user_id)
|
||||
return self.driver.list_projects_for_user(
|
||||
user_id, group_ids, hints or driver_hints.Hints())
|
||||
|
||||
@ -319,6 +320,19 @@ class Manager(manager.Manager):
|
||||
def list_domains(self, hints=None):
|
||||
return self.driver.list_domains(hints or driver_hints.Hints())
|
||||
|
||||
# TODO(henry-nash): We might want to consider list limiting this at some
|
||||
# point in the future.
|
||||
def list_domains_for_user(self, user_id, hints=None):
|
||||
# NOTE(henry-nash): In order to get a complete list of user domains,
|
||||
# the driver will need to look at group assignments. To avoid cross
|
||||
# calling between the assignment and identity driver we get the group
|
||||
# list here and pass it in. The rest of the detailed logic of listing
|
||||
# projects for a user is pushed down into the driver to enable
|
||||
# optimization with the various backend technologies (SQL, LDAP etc.).
|
||||
group_ids = self._get_group_ids_for_user_id(user_id)
|
||||
return self.driver.list_domains_for_user(
|
||||
user_id, group_ids, hints or driver_hints.Hints())
|
||||
|
||||
@notifications.disabled('domain', public=False)
|
||||
def _disable_domain(self, domain_id):
|
||||
self.token_api.delete_tokens_for_domain(domain_id)
|
||||
@ -894,6 +908,22 @@ class Driver(object):
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
||||
@abc.abstractmethod
|
||||
def list_domains_for_user(self, user_id, group_ids, hints):
|
||||
"""List all domains associated with a given user.
|
||||
|
||||
:param user_id: the user in question
|
||||
:param group_ids: the groups this user is a member of. This list is
|
||||
built in the Manager, so that the driver itself
|
||||
does not have to call across to identity.
|
||||
:param hints: filter hints which the driver should
|
||||
implement if at all possible.
|
||||
|
||||
:returns: a list of domain_refs or an empty list.
|
||||
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
||||
@abc.abstractmethod
|
||||
def list_domains_for_groups(self, group_ids):
|
||||
"""List domains accessible to specified groups.
|
||||
|
@ -18,6 +18,8 @@ from keystoneclient.common import cms
|
||||
from oslo.utils import timeutils
|
||||
import six
|
||||
|
||||
from keystone.assignment import controllers as assignment_controllers
|
||||
from keystone.common import authorization
|
||||
from keystone.common import controller
|
||||
from keystone.common import dependency
|
||||
from keystone.common import wsgi
|
||||
@ -338,8 +340,8 @@ class AuthInfo(object):
|
||||
self._scope_data = (domain_id, project_id, trust)
|
||||
|
||||
|
||||
@dependency.requires('assignment_api', 'identity_api', 'token_provider_api',
|
||||
'trust_api')
|
||||
@dependency.requires('assignment_api', 'catalog_api', 'identity_api',
|
||||
'token_provider_api', 'trust_api')
|
||||
class Auth(controller.V3Controller):
|
||||
|
||||
# Note(atiwari): From V3 auth controller code we are
|
||||
@ -537,6 +539,84 @@ class Auth(controller.V3Controller):
|
||||
|
||||
return {'signed': signed_text}
|
||||
|
||||
def get_auth_context(self, context):
|
||||
# TODO(dolphm): this method of accessing the auth context is terrible,
|
||||
# but context needs to be refactored to always have reasonable values.
|
||||
env_context = context.get('environment', {})
|
||||
return env_context.get(authorization.AUTH_CONTEXT_ENV, {})
|
||||
|
||||
def _combine_lists_uniquely(self, a, b):
|
||||
# it's most likely that only one of these will be filled so avoid
|
||||
# the combination if possible.
|
||||
if a and b:
|
||||
return dict((x['id'], x) for x in a + b).values()
|
||||
else:
|
||||
return a or b
|
||||
|
||||
@controller.protected()
|
||||
def get_auth_projects(self, context):
|
||||
auth_context = self.get_auth_context(context)
|
||||
|
||||
user_id = auth_context.get('user_id')
|
||||
user_refs = []
|
||||
if user_id:
|
||||
try:
|
||||
user_refs = self.assignment_api.list_projects_for_user(user_id)
|
||||
except exception.UserNotFound:
|
||||
# federated users have an id but they don't link to anything
|
||||
pass
|
||||
|
||||
group_ids = auth_context.get('group_ids')
|
||||
grp_refs = []
|
||||
if group_ids:
|
||||
grp_refs = self.assignment_api.list_projects_for_groups(group_ids)
|
||||
|
||||
refs = self._combine_lists_uniquely(user_refs, grp_refs)
|
||||
return assignment_controllers.ProjectV3.wrap_collection(context, refs)
|
||||
|
||||
@controller.protected()
|
||||
def get_auth_domains(self, context):
|
||||
auth_context = self.get_auth_context(context)
|
||||
|
||||
user_id = auth_context.get('user_id')
|
||||
user_refs = []
|
||||
if user_id:
|
||||
try:
|
||||
user_refs = self.assignment_api.list_domains_for_user(user_id)
|
||||
except exception.UserNotFound:
|
||||
# federated users have an id but they don't link to anything
|
||||
pass
|
||||
|
||||
group_ids = auth_context.get('group_ids')
|
||||
grp_refs = []
|
||||
if group_ids:
|
||||
grp_refs = self.assignment_api.list_domains_for_groups(group_ids)
|
||||
|
||||
refs = self._combine_lists_uniquely(user_refs, grp_refs)
|
||||
return assignment_controllers.DomainV3.wrap_collection(context, refs)
|
||||
|
||||
@controller.protected()
|
||||
def get_auth_catalog(self, context):
|
||||
auth_context = self.get_auth_context(context)
|
||||
user_id = auth_context.get('user_id')
|
||||
project_id = auth_context.get('project_id')
|
||||
|
||||
if not project_id:
|
||||
raise exception.Forbidden(
|
||||
_('A project-scoped token is required to produce a service '
|
||||
'catalog.'))
|
||||
|
||||
# The V3Controller base methods mostly assume that you're returning
|
||||
# either a collection or a single element from a collection, neither of
|
||||
# which apply to the catalog. Because this is a special case, this
|
||||
# re-implements a tiny bit of work done by the base controller (such as
|
||||
# self-referential link building) to avoid overriding or refactoring
|
||||
# several private methods.
|
||||
return {
|
||||
'catalog': self.catalog_api.get_v3_catalog(user_id, project_id),
|
||||
'links': {'self': self.base_url(context, path='auth/catalog')}
|
||||
}
|
||||
|
||||
|
||||
# FIXME(gyee): not sure if it belongs here or keystone.common. Park it here
|
||||
# for now.
|
||||
|
@ -37,3 +37,18 @@ class Routers(wsgi.RoutersBase):
|
||||
mapper, auth_controller,
|
||||
path='/auth/tokens/OS-PKI/revoked',
|
||||
get_action='revocation_list')
|
||||
|
||||
self._add_resource(
|
||||
mapper, auth_controller,
|
||||
path='/auth/catalog',
|
||||
get_action='get_auth_catalog')
|
||||
|
||||
self._add_resource(
|
||||
mapper, auth_controller,
|
||||
path='/auth/projects',
|
||||
get_action='get_auth_projects')
|
||||
|
||||
self._add_resource(
|
||||
mapper, auth_controller,
|
||||
path='/auth/domains',
|
||||
get_action='get_auth_domains')
|
||||
|
@ -17,7 +17,6 @@ import uuid
|
||||
|
||||
import six
|
||||
|
||||
from keystone.common import authorization
|
||||
from keystone.common import controller
|
||||
from keystone.common import dependency
|
||||
from keystone.common import wsgi
|
||||
@ -139,36 +138,6 @@ class Endpoint(controller.V2Controller):
|
||||
raise exception.EndpointNotFound(endpoint_id=endpoint_id)
|
||||
|
||||
|
||||
@dependency.requires('catalog_api')
|
||||
class CatalogV3(controller.V3Controller):
|
||||
collection_name = 'catalog'
|
||||
|
||||
@controller.protected()
|
||||
def get_catalog(self, context):
|
||||
# TODO(dolphm): this method of accessing the auth context is terrible,
|
||||
# but context needs to be refactored to always have reasonable values.
|
||||
env_context = context.get('environment', {})
|
||||
auth_context = env_context.get(authorization.AUTH_CONTEXT_ENV, {})
|
||||
user_id = auth_context.get('user_id')
|
||||
project_id = auth_context.get('project_id')
|
||||
|
||||
if not user_id or not project_id:
|
||||
raise exception.Forbidden(
|
||||
_('A project-scoped token is required to produce a service '
|
||||
'catalog.'))
|
||||
|
||||
# The V3Controller base methods mostly assume that you're returning
|
||||
# either a collection or a single element from a collection, neither of
|
||||
# which apply to the catalog. Because this is a special case, this
|
||||
# re-implements a tiny bit of work done by the base controller (such as
|
||||
# self-referential link building) to avoid overriding or refactoring
|
||||
# several private methods.
|
||||
return {
|
||||
'catalog': self.catalog_api.get_v3_catalog(user_id, project_id),
|
||||
'links': {
|
||||
'self': CatalogV3.base_url(context)}}
|
||||
|
||||
|
||||
@dependency.requires('catalog_api')
|
||||
class RegionV3(controller.V3Controller):
|
||||
collection_name = 'regions'
|
||||
|
@ -35,8 +35,3 @@ class Routers(wsgi.RoutersBase):
|
||||
'services', 'service'))
|
||||
routers.append(router.Router(controllers.EndpointV3(),
|
||||
'endpoints', 'endpoint'))
|
||||
|
||||
self._add_resource(
|
||||
mapper, controllers.CatalogV3(),
|
||||
path='/catalog',
|
||||
get_action='get_catalog')
|
||||
|
@ -311,6 +311,66 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
|
||||
self.assertNotIn('default_project_id', user_ref)
|
||||
session.close()
|
||||
|
||||
def test_list_domains_for_user(self):
|
||||
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
|
||||
self.assignment_api.create_domain(domain['id'], domain)
|
||||
user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
|
||||
'domain_id': domain['id'], 'enabled': True}
|
||||
|
||||
test_domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
|
||||
self.assignment_api.create_domain(test_domain1['id'], test_domain1)
|
||||
test_domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
|
||||
self.assignment_api.create_domain(test_domain2['id'], test_domain2)
|
||||
|
||||
user = self.identity_api.create_user(user)
|
||||
user_domains = self.assignment_api.list_domains_for_user(user['id'])
|
||||
self.assertEqual(0, len(user_domains))
|
||||
self.assignment_api.create_grant(user_id=user['id'],
|
||||
domain_id=test_domain1['id'],
|
||||
role_id=self.role_member['id'])
|
||||
self.assignment_api.create_grant(user_id=user['id'],
|
||||
domain_id=test_domain2['id'],
|
||||
role_id=self.role_member['id'])
|
||||
user_domains = self.assignment_api.list_domains_for_user(user['id'])
|
||||
self.assertThat(user_domains, matchers.HasLength(2))
|
||||
|
||||
def test_list_domains_for_user_with_grants(self):
|
||||
# Create two groups each with a role on a different domain, and
|
||||
# make user1 a member of both groups. Both these new domains
|
||||
# should now be included, along with any direct user grants.
|
||||
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
|
||||
self.assignment_api.create_domain(domain['id'], domain)
|
||||
user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
|
||||
'domain_id': domain['id'], 'enabled': True}
|
||||
user = self.identity_api.create_user(user)
|
||||
group1 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']}
|
||||
group1 = self.identity_api.create_group(group1)
|
||||
group2 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']}
|
||||
group2 = self.identity_api.create_group(group2)
|
||||
|
||||
test_domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
|
||||
self.assignment_api.create_domain(test_domain1['id'], test_domain1)
|
||||
test_domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
|
||||
self.assignment_api.create_domain(test_domain2['id'], test_domain2)
|
||||
test_domain3 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
|
||||
self.assignment_api.create_domain(test_domain3['id'], test_domain3)
|
||||
|
||||
self.identity_api.add_user_to_group(user['id'], group1['id'])
|
||||
self.identity_api.add_user_to_group(user['id'], group2['id'])
|
||||
|
||||
# Create 3 grants, one user grant, the other two as group grants
|
||||
self.assignment_api.create_grant(user_id=user['id'],
|
||||
domain_id=test_domain1['id'],
|
||||
role_id=self.role_member['id'])
|
||||
self.assignment_api.create_grant(group_id=group1['id'],
|
||||
domain_id=test_domain2['id'],
|
||||
role_id=self.role_admin['id'])
|
||||
self.assignment_api.create_grant(group_id=group2['id'],
|
||||
domain_id=test_domain3['id'],
|
||||
role_id=self.role_admin['id'])
|
||||
user_domains = self.assignment_api.list_domains_for_user(user['id'])
|
||||
self.assertThat(user_domains, matchers.HasLength(3))
|
||||
|
||||
|
||||
class SqlTrust(SqlTests, test_backend.TrustTests):
|
||||
pass
|
||||
|
@ -742,7 +742,7 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
|
||||
self.assertIsInstance(resp.json['links'], dict)
|
||||
self.assertEqual(['self'], resp.json['links'].keys())
|
||||
self.assertEqual(
|
||||
'http://localhost/v3/catalog',
|
||||
'http://localhost/v3/auth/catalog',
|
||||
resp.json['links']['self'])
|
||||
|
||||
def assertValidCatalog(self, entity):
|
||||
|
@ -19,6 +19,7 @@ import uuid
|
||||
|
||||
from keystoneclient.common import cms
|
||||
from oslo.utils import timeutils
|
||||
from testtools import matchers
|
||||
from testtools import testcase
|
||||
|
||||
from keystone import auth
|
||||
@ -3412,3 +3413,56 @@ class TestAuthContext(tests.TestCase):
|
||||
self.auth_context[attr_name] = attr_val_1
|
||||
self.auth_context[attr_name] = attr_val_2
|
||||
self.assertEqual(attr_val_2, self.auth_context[attr_name])
|
||||
|
||||
|
||||
class TestAuthSpecificData(test_v3.RestfulTestCase):
|
||||
|
||||
def test_get_catalog_project_scoped_token(self):
|
||||
"""Call ``GET /auth/catalog`` with a project-scoped token."""
|
||||
r = self.get(
|
||||
'/auth/catalog',
|
||||
expected_status=200)
|
||||
self.assertValidCatalogResponse(r)
|
||||
|
||||
def test_get_catalog_domain_scoped_token(self):
|
||||
"""Call ``GET /auth/catalog`` with a domain-scoped token."""
|
||||
# grant a domain role to a user
|
||||
self.put(path='/domains/%s/users/%s/roles/%s' % (
|
||||
self.domain['id'], self.user['id'], self.role['id']))
|
||||
|
||||
self.get(
|
||||
'/auth/catalog',
|
||||
auth=self.build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
domain_id=self.domain['id']),
|
||||
expected_status=403)
|
||||
|
||||
def test_get_catalog_unscoped_token(self):
|
||||
"""Call ``GET /auth/catalog`` with an unscoped token."""
|
||||
self.get(
|
||||
'/auth/catalog',
|
||||
auth=self.build_authentication_request(
|
||||
user_id=self.default_domain_user['id'],
|
||||
password=self.default_domain_user['password']),
|
||||
expected_status=403)
|
||||
|
||||
def test_get_catalog_no_token(self):
|
||||
"""Call ``GET /auth/catalog`` without a token."""
|
||||
self.get(
|
||||
'/auth/catalog',
|
||||
noauth=True,
|
||||
expected_status=401)
|
||||
|
||||
def test_get_projects_project_scoped_token(self):
|
||||
r = self.get('/auth/projects', expected_status=200)
|
||||
self.assertThat(r.json['projects'], matchers.HasLength(1))
|
||||
self.assertValidProjectListResponse(r)
|
||||
|
||||
def test_get_domains_project_scoped_token(self):
|
||||
self.put(path='/domains/%s/users/%s/roles/%s' % (
|
||||
self.domain['id'], self.user['id'], self.role['id']))
|
||||
|
||||
r = self.get('/auth/domains', expected_status=200)
|
||||
self.assertThat(r.json['domains'], matchers.HasLength(1))
|
||||
self.assertValidDomainListResponse(r)
|
||||
|
@ -24,43 +24,6 @@ from keystone.tests import test_v3
|
||||
class CatalogTestCase(test_v3.RestfulTestCase):
|
||||
"""Test service & endpoint CRUD."""
|
||||
|
||||
def test_get_catalog_project_scoped_token(self):
|
||||
"""Call ``GET /catalog`` with a project-scoped token."""
|
||||
r = self.get(
|
||||
'/catalog',
|
||||
expected_status=200)
|
||||
self.assertValidCatalogResponse(r)
|
||||
|
||||
def test_get_catalog_domain_scoped_token(self):
|
||||
"""Call ``GET /catalog`` with a domain-scoped token."""
|
||||
# grant a domain role to a user
|
||||
self.put(path='/domains/%s/users/%s/roles/%s' % (
|
||||
self.domain['id'], self.user['id'], self.role['id']))
|
||||
|
||||
self.get(
|
||||
'/catalog',
|
||||
auth=self.build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
domain_id=self.domain['id']),
|
||||
expected_status=403)
|
||||
|
||||
def test_get_catalog_unscoped_token(self):
|
||||
"""Call ``GET /catalog`` with an unscoped token."""
|
||||
self.get(
|
||||
'/catalog',
|
||||
auth=self.build_authentication_request(
|
||||
user_id=self.default_domain_user['id'],
|
||||
password=self.default_domain_user['password']),
|
||||
expected_status=403)
|
||||
|
||||
def test_get_catalog_no_token(self):
|
||||
"""Call ``GET /catalog`` without a token."""
|
||||
self.get(
|
||||
'/catalog',
|
||||
noauth=True,
|
||||
expected_status=401)
|
||||
|
||||
# region crud tests
|
||||
|
||||
def test_create_region_with_id(self):
|
||||
|
@ -1022,7 +1022,7 @@ class FederatedTokenTests(FederationTests):
|
||||
self._check_scoped_token_attributes(token_resp)
|
||||
|
||||
def test_list_projects(self):
|
||||
url = '/OS-FEDERATION/projects'
|
||||
urls = ('/OS-FEDERATION/projects', '/auth/projects')
|
||||
|
||||
token = (self.tokens['CUSTOMER_ASSERTION'],
|
||||
self.tokens['EMPLOYEE_ASSERTION'],
|
||||
@ -1036,13 +1036,15 @@ class FederatedTokenTests(FederationTests):
|
||||
self.proj_customers['id']]))
|
||||
|
||||
for token, projects_ref in zip(token, projects_refs):
|
||||
r = self.get(url, token=token)
|
||||
projects_resp = r.result['projects']
|
||||
projects = set(p['id'] for p in projects_resp)
|
||||
self.assertEqual(projects, projects_ref)
|
||||
for url in urls:
|
||||
r = self.get(url, token=token)
|
||||
projects_resp = r.result['projects']
|
||||
projects = set(p['id'] for p in projects_resp)
|
||||
self.assertEqual(projects, projects_ref,
|
||||
'match failed for url %s' % url)
|
||||
|
||||
def test_list_domains(self):
|
||||
url = '/OS-FEDERATION/domains'
|
||||
urls = ('/OS-FEDERATION/domains', '/auth/domains')
|
||||
|
||||
tokens = (self.tokens['CUSTOMER_ASSERTION'],
|
||||
self.tokens['EMPLOYEE_ASSERTION'],
|
||||
@ -1056,10 +1058,12 @@ class FederatedTokenTests(FederationTests):
|
||||
self.domainC['id']]))
|
||||
|
||||
for token, domains_ref in zip(tokens, domain_refs):
|
||||
r = self.get(url, token=token)
|
||||
domains_resp = r.result['domains']
|
||||
domains = set(p['id'] for p in domains_resp)
|
||||
self.assertEqual(domains, domains_ref)
|
||||
for url in urls:
|
||||
r = self.get(url, token=token)
|
||||
domains_resp = r.result['domains']
|
||||
domains = set(p['id'] for p in domains_resp)
|
||||
self.assertEqual(domains, domains_ref,
|
||||
'match failed for url %s' % url)
|
||||
|
||||
def test_full_workflow(self):
|
||||
"""Test 'standard' workflow for granting access tokens.
|
||||
|
Loading…
Reference in New Issue
Block a user