
This adds auth functionality to the Auth Drop down. A new K2K django auth plugin has been added (With the intent to do K2K at Login Time). Session variables have been added so horizon can display the names of the Keystone Providers. An endpoint was also added that allows the user to switch keystone providers. Change-Id: I75b1a10a3b40b5544b60f6fdc060e0070c585977 Implements: blueprint k2k-horizon
362 lines
12 KiB
Python
362 lines
12 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import datetime
|
|
import uuid
|
|
|
|
from django.utils import datetime_safe
|
|
from keystoneauth1.access import access
|
|
from keystoneauth1.access import service_catalog
|
|
from keystoneclient.common import cms
|
|
from keystoneclient.v3 import domains
|
|
from keystoneclient.v3 import projects
|
|
from keystoneclient.v3 import roles
|
|
from keystoneclient.v3 import users
|
|
import requests
|
|
|
|
|
|
class TestDataContainer(object):
|
|
"""Arbitrary holder for test data in an object-oriented fashion."""
|
|
pass
|
|
|
|
|
|
class TestResponse(requests.Response):
|
|
"""Class used to wrap requests.Response.
|
|
|
|
It also provides some convenience to initialize with a dict.
|
|
"""
|
|
|
|
def __init__(self, data):
|
|
self._text = None
|
|
super(TestResponse, self).__init__()
|
|
if isinstance(data, dict):
|
|
self.status_code = data.get('status_code', 200)
|
|
self.headers = data.get('headers', None)
|
|
# Fake the text attribute to streamline Response creation
|
|
self._text = data.get('text', None)
|
|
else:
|
|
self.status_code = data
|
|
|
|
def __eq__(self, other):
|
|
return self.__dict__ == other.__dict__
|
|
|
|
@property
|
|
def text(self):
|
|
return self._text
|
|
|
|
|
|
def generate_test_data(pki=False, service_providers=False,
|
|
endpoint='localhost'):
|
|
'''Builds a set of test_data data as returned by Keystone V2.'''
|
|
test_data = TestDataContainer()
|
|
|
|
keystone_service = {
|
|
'type': 'identity',
|
|
'id': uuid.uuid4().hex,
|
|
'endpoints': [
|
|
{
|
|
'url': 'http://admin.%s:35357/v3' % endpoint,
|
|
'region': 'RegionOne',
|
|
'interface': 'admin',
|
|
'id': uuid.uuid4().hex,
|
|
},
|
|
{
|
|
'url': 'http://internal.%s:5000/v3' % endpoint,
|
|
'region': 'RegionOne',
|
|
'interface': 'internal',
|
|
'id': uuid.uuid4().hex
|
|
},
|
|
{
|
|
'url': 'http://public.%s:5000/v3' % endpoint,
|
|
'region': 'RegionOne',
|
|
'interface': 'public',
|
|
'id': uuid.uuid4().hex
|
|
}
|
|
]
|
|
}
|
|
|
|
# Domains
|
|
domain_dict = {'id': uuid.uuid4().hex,
|
|
'name': 'domain',
|
|
'description': '',
|
|
'enabled': True}
|
|
test_data.domain = domains.Domain(domains.DomainManager(None),
|
|
domain_dict, loaded=True)
|
|
|
|
# Users
|
|
user_dict = {'id': uuid.uuid4().hex,
|
|
'name': 'gabriel',
|
|
'email': 'gabriel@example.com',
|
|
'password': 'swordfish',
|
|
'domain_id': domain_dict['id'],
|
|
'token': '',
|
|
'enabled': True}
|
|
test_data.user = users.User(users.UserManager(None),
|
|
user_dict, loaded=True)
|
|
|
|
# Projects
|
|
project_dict_1 = {'id': uuid.uuid4().hex,
|
|
'name': 'tenant_one',
|
|
'description': '',
|
|
'domain_id': domain_dict['id'],
|
|
'enabled': True}
|
|
project_dict_2 = {'id': uuid.uuid4().hex,
|
|
'name': 'tenant_two',
|
|
'description': '',
|
|
'domain_id': domain_dict['id'],
|
|
'enabled': False}
|
|
test_data.project_one = projects.Project(projects.ProjectManager(None),
|
|
project_dict_1,
|
|
loaded=True)
|
|
test_data.project_two = projects.Project(projects.ProjectManager(None),
|
|
project_dict_2,
|
|
loaded=True)
|
|
|
|
# Roles
|
|
role_dict = {'id': uuid.uuid4().hex,
|
|
'name': 'Member'}
|
|
test_data.role = roles.Role(roles.RoleManager, role_dict)
|
|
|
|
nova_service = {
|
|
'type': 'compute',
|
|
'id': uuid.uuid4().hex,
|
|
'endpoints': [
|
|
{
|
|
'url': ('http://nova-admin.%s:8774/v2.0/%s'
|
|
% (endpoint, project_dict_1['id'])),
|
|
'region': 'RegionOne',
|
|
'interface': 'admin',
|
|
'id': uuid.uuid4().hex,
|
|
},
|
|
{
|
|
'url': ('http://nova-internal.%s:8774/v2.0/%s'
|
|
% (endpoint, project_dict_1['id'])),
|
|
'region': 'RegionOne',
|
|
'interface': 'internal',
|
|
'id': uuid.uuid4().hex
|
|
},
|
|
{
|
|
'url': ('http://nova-public.%s:8774/v2.0/%s'
|
|
% (endpoint, project_dict_1['id'])),
|
|
'region': 'RegionOne',
|
|
'interface': 'public',
|
|
'id': uuid.uuid4().hex
|
|
},
|
|
{
|
|
'url': ('http://nova2-admin.%s:8774/v2.0/%s'
|
|
% (endpoint, project_dict_1['id'])),
|
|
'region': 'RegionTwo',
|
|
'interface': 'admin',
|
|
'id': uuid.uuid4().hex,
|
|
},
|
|
{
|
|
'url': ('http://nova2-internal.%s:8774/v2.0/%s'
|
|
% (endpoint, project_dict_1['id'])),
|
|
'region': 'RegionTwo',
|
|
'interface': 'internal',
|
|
'id': uuid.uuid4().hex
|
|
},
|
|
{
|
|
'url': ('http://nova2-public.%s:8774/v2.0/%s'
|
|
% (endpoint, project_dict_1['id'])),
|
|
'region': 'RegionTwo',
|
|
'interface': 'public',
|
|
'id': uuid.uuid4().hex
|
|
}
|
|
]
|
|
}
|
|
|
|
# Tokens
|
|
tomorrow = datetime_safe.datetime.now() + datetime.timedelta(days=1)
|
|
expiration = datetime_safe.datetime.isoformat(tomorrow)
|
|
if pki:
|
|
# We don't need a real PKI token, but just the prefix to make the
|
|
# keystone client treat it as a PKI token
|
|
auth_token = cms.PKI_ASN1_PREFIX + uuid.uuid4().hex
|
|
else:
|
|
auth_token = uuid.uuid4().hex
|
|
|
|
auth_response_headers = {
|
|
'X-Subject-Token': auth_token
|
|
}
|
|
|
|
auth_response = TestResponse({
|
|
"headers": auth_response_headers
|
|
})
|
|
|
|
scoped_token_dict = {
|
|
'token': {
|
|
'methods': ['password'],
|
|
'expires_at': expiration,
|
|
'project': {
|
|
'id': project_dict_1['id'],
|
|
'name': project_dict_1['name'],
|
|
'domain': {
|
|
'id': domain_dict['id'],
|
|
'name': domain_dict['name']
|
|
}
|
|
},
|
|
'user': {
|
|
'id': user_dict['id'],
|
|
'name': user_dict['name'],
|
|
'domain': {
|
|
'id': domain_dict['id'],
|
|
'name': domain_dict['name']
|
|
}
|
|
},
|
|
'roles': [role_dict],
|
|
'catalog': [keystone_service, nova_service]
|
|
}
|
|
}
|
|
|
|
sp_list = None
|
|
if service_providers:
|
|
test_data.sp_auth_url = 'http://service_provider_endp:5000/v3'
|
|
test_data.service_provider_id = 'k2kserviceprovider'
|
|
# The access info for the identity provider
|
|
# should return a list of service providers
|
|
sp_list = [
|
|
{'auth_url': test_data.sp_auth_url,
|
|
'id': test_data.service_provider_id,
|
|
'sp_url': 'https://k2kserviceprovider/sp_url'}
|
|
]
|
|
scoped_token_dict['token']['service_providers'] = sp_list
|
|
|
|
test_data.scoped_access_info = access.create(
|
|
resp=auth_response,
|
|
body=scoped_token_dict
|
|
)
|
|
|
|
domain_token_dict = {
|
|
'token': {
|
|
'methods': ['password'],
|
|
'expires_at': expiration,
|
|
'domain': {
|
|
'id': domain_dict['id'],
|
|
'name': domain_dict['name'],
|
|
},
|
|
'user': {
|
|
'id': user_dict['id'],
|
|
'name': user_dict['name'],
|
|
'domain': {
|
|
'id': domain_dict['id'],
|
|
'name': domain_dict['name']
|
|
}
|
|
},
|
|
'roles': [role_dict],
|
|
'catalog': [keystone_service, nova_service]
|
|
}
|
|
}
|
|
test_data.domain_scoped_access_info = access.create(
|
|
resp=auth_response,
|
|
body=domain_token_dict
|
|
)
|
|
|
|
unscoped_token_dict = {
|
|
'token': {
|
|
'methods': ['password'],
|
|
'expires_at': expiration,
|
|
'user': {
|
|
'id': user_dict['id'],
|
|
'name': user_dict['name'],
|
|
'domain': {
|
|
'id': domain_dict['id'],
|
|
'name': domain_dict['name']
|
|
}
|
|
},
|
|
'catalog': [keystone_service]
|
|
}
|
|
}
|
|
|
|
if service_providers:
|
|
unscoped_token_dict['token']['service_providers'] = sp_list
|
|
|
|
test_data.unscoped_access_info = access.create(
|
|
resp=auth_response,
|
|
body=unscoped_token_dict
|
|
)
|
|
|
|
# Service Catalog
|
|
test_data.service_catalog = service_catalog.ServiceCatalogV3(
|
|
[keystone_service, nova_service])
|
|
|
|
# federated user
|
|
federated_scoped_token_dict = {
|
|
'token': {
|
|
'methods': ['password'],
|
|
'expires_at': expiration,
|
|
'project': {
|
|
'id': project_dict_1['id'],
|
|
'name': project_dict_1['name'],
|
|
'domain': {
|
|
'id': domain_dict['id'],
|
|
'name': domain_dict['name']
|
|
}
|
|
},
|
|
'user': {
|
|
'id': user_dict['id'],
|
|
'name': user_dict['name'],
|
|
'domain': {
|
|
'id': domain_dict['id'],
|
|
'name': domain_dict['name']
|
|
},
|
|
'OS-FEDERATION': {
|
|
'identity_provider': 'ACME',
|
|
'protocol': 'OIDC',
|
|
'groups': [
|
|
{'id': uuid.uuid4().hex},
|
|
{'id': uuid.uuid4().hex}
|
|
]
|
|
}
|
|
},
|
|
'roles': [role_dict],
|
|
'catalog': [keystone_service, nova_service]
|
|
}
|
|
}
|
|
|
|
test_data.federated_scoped_access_info = access.create(
|
|
resp=auth_response,
|
|
body=federated_scoped_token_dict
|
|
)
|
|
|
|
federated_unscoped_token_dict = {
|
|
'token': {
|
|
'methods': ['password'],
|
|
'expires_at': expiration,
|
|
'user': {
|
|
'id': user_dict['id'],
|
|
'name': user_dict['name'],
|
|
'domain': {
|
|
'id': domain_dict['id'],
|
|
'name': domain_dict['name']
|
|
},
|
|
'OS-FEDERATION': {
|
|
'identity_provider': 'ACME',
|
|
'protocol': 'OIDC',
|
|
'groups': [
|
|
{'id': uuid.uuid4().hex},
|
|
{'id': uuid.uuid4().hex}
|
|
]
|
|
}
|
|
},
|
|
'catalog': [keystone_service]
|
|
}
|
|
}
|
|
|
|
test_data.federated_unscoped_access_info = access.create(
|
|
resp=auth_response,
|
|
body=federated_unscoped_token_dict
|
|
)
|
|
|
|
return test_data
|