Add capability for Keystone V3 Authentication.

For multi-domain model, set OPENSTACK_KEYSTONE_MULTIDOMAIN_SUPPORT
to True and the login form will prompt the user for Domain name.

For single-domain model, set OPENSTACK_KEYSTONE_MULTIDOMAIN_SUPPORT
to False. The application will use the name of the default domain in
OPENSTACK_KEYSTONE_DEFAULT_DOMAIN to login.

Cleanup the unused Tenant field in the login form.

Implements blueprint login-domain-support
This commit is contained in:
Lin Hua Cheng
2013-07-03 14:25:33 -07:00
parent 74f800684a
commit b63d876974
11 changed files with 953 additions and 255 deletions

View File

@@ -1,18 +1,18 @@
""" Module defining the Django auth backend class for the Keystone API. """
import hashlib
import logging
from django.conf import settings
from django.utils.translation import ugettext_lazy as _
from keystoneclient.v2_0 import client as keystone_client
from keystoneclient import exceptions as keystone_exceptions
from keystoneclient.v2_0.tokens import Token, TokenManager
from .exceptions import KeystoneAuthException
from .user import create_user_from_token
from .utils import check_token_expiration, is_ans1_token
from .user import Token
from .utils import check_token_expiration
from .utils import get_keystone_client
from .utils import get_keystone_version
LOG = logging.getLogger(__name__)
@@ -22,11 +22,12 @@ KEYSTONE_CLIENT_ATTR = "_keystoneclient"
class KeystoneBackend(object):
"""Django authentication backend class for use with
``django.contrib.auth``.
"""
Django authentication backend class for use with ``django.contrib.auth``.
"""
def check_auth_expiry(self, token):
if not check_token_expiration(token):
def check_auth_expiry(self, auth_ref):
if not check_token_expiration(auth_ref):
msg = _("The authentication token issued by the Identity service "
"has expired.")
LOG.warning("The authentication token issued by the Identity "
@@ -37,41 +38,41 @@ class KeystoneBackend(object):
return True
def get_user(self, user_id):
"""
Returns the current user (if authenticated) based on the user ID
"""Returns the current user (if authenticated) based on the user ID
and session data.
Note: this required monkey-patching the ``contrib.auth`` middleware
to make the ``request`` object available to the auth backend class.
"""
if user_id == self.request.session["user_id"]:
token = Token(TokenManager(None),
self.request.session['token'],
loaded=True)
token = self.request.session['token']
endpoint = self.request.session['region_endpoint']
services_region = self.request.session['services_region']
return create_user_from_token(self.request, token, endpoint,
user = create_user_from_token(self.request, token, endpoint,
services_region)
return user
else:
return None
def authenticate(self, request=None, username=None, password=None,
tenant=None, auth_url=None):
""" Authenticates a user via the Keystone Identity API. """
user_domain_name=None, auth_url=None):
"""Authenticates a user via the Keystone Identity API. """
LOG.debug('Beginning user authentication for user "%s".' % username)
insecure = getattr(settings, 'OPENSTACK_SSL_NO_VERIFY', False)
keystone_client = get_keystone_client()
try:
client = keystone_client.Client(username=username,
password=password,
tenant_id=tenant,
auth_url=auth_url,
insecure=insecure)
unscoped_token_data = {"token": client.service_catalog.get_token()}
unscoped_token = Token(TokenManager(None),
unscoped_token_data,
loaded=True)
client = keystone_client.Client(
user_domain_name=user_domain_name,
username=username,
password=password,
auth_url=auth_url,
insecure=insecure,
debug=settings.DEBUG)
unscoped_auth_ref = client.auth_ref
unscoped_token = Token(auth_ref=unscoped_auth_ref)
except (keystone_exceptions.Unauthorized,
keystone_exceptions.Forbidden,
keystone_exceptions.NotFound) as exc:
@@ -85,54 +86,60 @@ class KeystoneBackend(object):
LOG.debug(exc.message)
raise KeystoneAuthException(msg)
# Check expiry for our unscoped token.
self.check_auth_expiry(unscoped_token)
# Check expiry for our unscoped auth ref.
self.check_auth_expiry(unscoped_auth_ref)
# FIXME: Log in to default tenant when the Keystone API returns it...
# For now we list all the user's tenants and iterate through.
try:
tenants = client.tenants.list()
except (keystone_exceptions.ClientException,
keystone_exceptions.AuthorizationFailure):
msg = _('Unable to retrieve authorized projects.')
raise KeystoneAuthException(msg)
# Abort if there are no tenants for this user
if not tenants:
msg = _('You are not authorized for any projects.')
raise KeystoneAuthException(msg)
while tenants:
tenant = tenants.pop()
# Check if token is automatically scoped to default_project
if unscoped_auth_ref.project_scoped:
auth_ref = unscoped_auth_ref
else:
# For now we list all the user's projects and iterate through.
try:
client = keystone_client.Client(tenant_id=tenant.id,
token=unscoped_token.id,
auth_url=auth_url,
insecure=insecure)
token = client.tokens.authenticate(username=username,
token=unscoped_token.id,
tenant_id=tenant.id)
break
if get_keystone_version() < 3:
projects = client.tenants.list()
else:
client.management_url = auth_url
projects = client.projects.list(
user=unscoped_auth_ref.user_id)
except (keystone_exceptions.ClientException,
keystone_exceptions.AuthorizationFailure):
token = None
keystone_exceptions.AuthorizationFailure) as exc:
msg = _('Unable to retrieve authorized projects.')
raise KeystoneAuthException(msg)
if token is None:
msg = _("Unable to authenticate to any available projects.")
raise KeystoneAuthException(msg)
# Abort if there are no projects for this user
if not projects:
msg = _('You are not authorized for any projects.')
raise KeystoneAuthException(msg)
while projects:
project = projects.pop()
try:
client = keystone_client.Client(
tenant_id=project.id,
token=unscoped_auth_ref.auth_token,
auth_url=auth_url,
insecure=insecure,
debug=settings.DEBUG)
auth_ref = client.auth_ref
break
except (keystone_exceptions.ClientException,
keystone_exceptions.AuthorizationFailure):
auth_ref = None
if auth_ref is None:
msg = _("Unable to authenticate to any available"
" projects.")
raise KeystoneAuthException(msg)
# Check expiry for our new scoped token.
self.check_auth_expiry(token)
self.check_auth_expiry(auth_ref)
# If we made it here we succeeded. Create our User!
user = create_user_from_token(request,
token,
Token(auth_ref),
client.service_catalog.url_for())
if request is not None:
if is_ans1_token(unscoped_token.id):
hashed_token = hashlib.md5(unscoped_token.id).hexdigest()
unscoped_token._info['token']['id'] = hashed_token
request.session['unscoped_token'] = unscoped_token.id
request.user = user
@@ -143,15 +150,17 @@ class KeystoneBackend(object):
return user
def get_group_permissions(self, user, obj=None):
""" Returns an empty set since Keystone doesn't support "groups". """
"""Returns an empty set since Keystone doesn't support "groups". """
# Keystone V3 added "groups". The Auth token response includes the
# roles from the user's Group assignment. It should be fine just
# returning an empty set here.
return set()
def get_all_permissions(self, user, obj=None):
"""
Returns a set of permission strings that this user has through his/her
Keystone "roles".
"""Returns a set of permission strings that this user has through
his/her Keystone "roles".
The permissions are returned as ``"openstack.{{ role.name }}"``.
The permissions are returned as ``"openstack.{{ role.name }}"``.
"""
if user.is_anonymous() or obj is not None:
return set()
@@ -163,16 +172,15 @@ class KeystoneBackend(object):
return role_perms | service_perms
def has_perm(self, user, perm, obj=None):
""" Returns True if the given user has the specified permission. """
"""Returns True if the given user has the specified permission. """
if not user.is_active:
return False
return perm in self.get_all_permissions(user, obj)
def has_module_perms(self, user, app_label):
"""
Returns True if user has any permissions in the given app_label.
"""Returns True if user has any permissions in the given app_label.
Currently this matches for the app_label ``"openstack"``.
Currently this matches for the app_label ``"openstack"``.
"""
if not user.is_active:
return False

View File

@@ -16,8 +16,18 @@ LOG = logging.getLogger(__name__)
class Login(AuthenticationForm):
""" Form used for logging in a user.
Handles authentication with Keystone, choosing a tenant, and fetching
a scoped token token for that tenant.
Handles authentication with Keystone by providing the domain name, username
and password. A scoped token is fetched after successful authentication.
A domain name is required if authenticating with Keystone V3 running
multi-domain configuration.
If the user authenticated has a default project set, the token will be
automatically scoped to their default project.
If the user authenticated has no default project set, the authentication
backend will try to scope to the projects returned from the user's assigned
projects. The first successful project scoped will be returned.
Inherits from the base ``django.contrib.auth.forms.AuthenticationForm``
class for added security features.
@@ -26,10 +36,16 @@ class Login(AuthenticationForm):
username = forms.CharField(label=_("User Name"))
password = forms.CharField(label=_("Password"),
widget=forms.PasswordInput(render_value=False))
tenant = forms.CharField(required=False, widget=forms.HiddenInput())
def __init__(self, *args, **kwargs):
super(Login, self).__init__(*args, **kwargs)
self.fields.keyOrder = ['username', 'password', 'region']
if getattr(settings,
'OPENSTACK_KEYSTONE_MULTIDOMAIN_SUPPORT',
False):
self.fields['domain'] = forms.CharField(label=_("Domain"),
required=True)
self.fields.keyOrder = ['domain', 'username', 'password', 'region']
self.fields['region'].choices = self.get_region_choices()
if len(self.fields['region'].choices) == 1:
self.fields['region'].initial = self.fields['region'].choices[0][0]
@@ -42,13 +58,13 @@ class Login(AuthenticationForm):
@sensitive_variables()
def clean(self):
default_domain = getattr(settings,
'OPENSTACK_KEYSTONE_DEFAULT_DOMAIN',
'Default')
username = self.cleaned_data.get('username')
password = self.cleaned_data.get('password')
region = self.cleaned_data.get('region')
tenant = self.cleaned_data.get('tenant')
if not tenant:
tenant = None
domain = self.cleaned_data.get('domain', default_domain)
if not (username and password):
# Don't authenticate, just let the other validators handle it.
@@ -58,7 +74,7 @@ class Login(AuthenticationForm):
self.user_cache = authenticate(request=self.request,
username=username,
password=password,
tenant=tenant,
user_domain_name=domain,
auth_url=region)
msg = 'Login successful for user "%(username)s".' % \
{'username': username}

View File

@@ -4,12 +4,11 @@ from datetime import timedelta
from django.utils import datetime_safe
from keystoneclient.access import AccessInfo
from keystoneclient.service_catalog import ServiceCatalog
from keystoneclient.v2_0.roles import Role, RoleManager
from keystoneclient.v2_0.tenants import Tenant, TenantManager
from keystoneclient.v2_0.tokens import Token, TokenManager
from keystoneclient.v2_0.users import User, UserManager
from keystoneclient.service_catalog import ServiceCatalog
from keystoneclient import access
class TestDataContainer(object):
@@ -18,7 +17,7 @@ class TestDataContainer(object):
def generate_test_data():
''' Builds a set of test_data data as returned by Keystone. '''
''' Builds a set of test_data data as returned by Keystone V2. '''
test_data = TestDataContainer()
keystone_service = {
@@ -96,49 +95,49 @@ def generate_test_data():
expiration = datetime_safe.datetime.isoformat(tomorrow)
scoped_token_dict = {
'token': {
'id': uuid.uuid4().hex,
'expires': expiration,
'tenant': tenant_dict_1,
'tenants': [tenant_dict_1, tenant_dict_2]},
'user': {
'id': user_dict['id'],
'name': user_dict['name'],
'roles': [role_dict]},
'serviceCatalog': [keystone_service, nova_service]
'access': {
'token': {
'id': uuid.uuid4().hex,
'expires': expiration,
'tenant': tenant_dict_1,
'tenants': [tenant_dict_1, tenant_dict_2]},
'user': {
'id': user_dict['id'],
'name': user_dict['name'],
'roles': [role_dict]},
'serviceCatalog': [keystone_service, nova_service]
}
}
test_data.scoped_token = Token(TokenManager(None),
scoped_token_dict,
loaded=True)
test_data.scoped_access_info = AccessInfo.factory(
resp=None,
body=scoped_token_dict)
unscoped_token_dict = {
'token': {
'id': uuid.uuid4().hex,
'expires': expiration},
'user': {
'id': user_dict['id'],
'name': user_dict['name'],
'roles': [role_dict]},
'serviceCatalog': [keystone_service]
'access': {
'token': {
'id': uuid.uuid4().hex,
'expires': expiration},
'user': {
'id': user_dict['id'],
'name': user_dict['name'],
'roles': [role_dict]},
'serviceCatalog': [keystone_service]
}
}
test_data.unscoped_token = Token(TokenManager(None),
unscoped_token_dict,
loaded=True)
test_data.unscoped_access_info = AccessInfo.factory(
resp=None,
body=unscoped_token_dict)
# Service Catalog
test_data.service_catalog = ServiceCatalog.factory({
'serviceCatalog': [keystone_service, nova_service],
'token': {
'id': scoped_token_dict['token']['id'],
'expires': scoped_token_dict['token']['expires'],
'id': scoped_token_dict['access']['token']['id'],
'expires': scoped_token_dict['access']['token']['expires'],
'user_id': user_dict['id'],
'tenant_id': tenant_dict_1['id']
}
})
versioned_scoped_toked_dict = scoped_token_dict
versioned_scoped_toked_dict['version'] = 'v2.0'
test_data.access_info = access.AccessInfo(versioned_scoped_toked_dict)
return test_data

View File

@@ -0,0 +1,233 @@
import requests
import uuid
from datetime import timedelta
from django.utils import datetime_safe
from keystoneclient.access import AccessInfo
from keystoneclient.service_catalog import ServiceCatalog
from keystoneclient.v3.domains import Domain, DomainManager
from keystoneclient.v3.roles import Role, RoleManager
from keystoneclient.v3.projects import Project, ProjectManager
from keystoneclient.v3.users import User, UserManager
class TestDataContainer(object):
""" Arbitrary holder for test data in an object-oriented fashion. """
pass
class TestResponse(requests.Response):
""" Class used to wrap requests.Response and provide some
convenience to initialize with a dict """
def __init__(self, data):
self._text = None
super(TestResponse, self)
if isinstance(data, dict):
self.status_code = data.get('status_code', None)
self.headers = data.get('headers', None)
# Fake the text attribute to streamline Response creation
self._text = data.get('text', None)
else:
self.status_code = data
def __eq__(self, other):
return self.__dict__ == other.__dict__
@property
def text(self):
return self._text
def generate_test_data():
''' Builds a set of test_data data as returned by Keystone V2. '''
test_data = TestDataContainer()
keystone_service = {
'type': 'identity',
'id': uuid.uuid4().hex,
'endpoints': [
{
'url': 'http://admin.localhost:35357/v3',
'region': 'RegionOne',
'interface': 'admin',
'id': uuid.uuid4().hex,
},
{
'url': 'http://internal.localhost:5000/v3',
'region': 'RegionOne',
'interface': 'internal',
'id': uuid.uuid4().hex
},
{
'url':'http://public.localhost:5000/v3',
'region':'RegionOne',
'interface': 'public',
'id': uuid.uuid4().hex
}
]
}
# Domains
domain_dict = {'id': uuid.uuid4().hex,
'name': 'domain',
'description': '',
'enabled': True}
test_data.domain = Domain(DomainManager(None), domain_dict, loaded=True)
# Users
user_dict = {'id': uuid.uuid4().hex,
'name': 'gabriel',
'email': 'gabriel@example.com',
'password': 'swordfish',
'domain_id': domain_dict['id'],
'token': '',
'enabled': True}
test_data.user = User(UserManager(None), user_dict, loaded=True)
# Projects
project_dict_1 = {'id': uuid.uuid4().hex,
'name': 'tenant_one',
'description': '',
'domain_id': domain_dict['id'],
'enabled': True}
project_dict_2 = {'id': uuid.uuid4().hex,
'name': '',
'description': '',
'domain_id': domain_dict['id'],
'enabled': False}
test_data.project_one = Project(ProjectManager(None),
project_dict_1,
loaded=True)
test_data.project_two = Project(ProjectManager(None),
project_dict_2,
loaded=True)
# Roles
role_dict = {'id': uuid.uuid4().hex,
'name': 'Member'}
test_data.role = Role(RoleManager, role_dict)
nova_service = {
'type': 'compute',
'id': uuid.uuid4().hex,
'endpoints': [
{
'url': 'http://nova-admin.localhost:8774/v2.0/%s' \
% (project_dict_1['id']),
'region': 'RegionOne',
'interface': 'admin',
'id': uuid.uuid4().hex,
},
{
'url': 'http://nova-internal.localhost:8774/v2.0/%s' \
% (project_dict_1['id']),
'region': 'RegionOne',
'interface': 'internal',
'id': uuid.uuid4().hex
},
{
'url':'http://nova-public.localhost:8774/v2.0/%s' \
% (project_dict_1['id']),
'region':'RegionOne',
'interface': 'public',
'id': uuid.uuid4().hex
},
{
'url': 'http://nova2-admin.localhost:8774/v2.0/%s' \
% (project_dict_1['id']),
'region': 'RegionTwo',
'interface': 'admin',
'id': uuid.uuid4().hex,
},
{
'url': 'http://nova2-internal.localhost:8774/v2.0/%s' \
% (project_dict_1['id']),
'region': 'RegionTwo',
'interface': 'internal',
'id': uuid.uuid4().hex
},
{
'url':'http://nova2-public.localhost:8774/v2.0/%s' \
% (project_dict_1['id']),
'region':'RegionTwo',
'interface': 'public',
'id': uuid.uuid4().hex
}
]
}
# Tokens
tomorrow = datetime_safe.datetime.now() + timedelta(days=1)
expiration = datetime_safe.datetime.isoformat(tomorrow)
auth_token = uuid.uuid4().hex
auth_response_headers = {
'X-Subject-Token': auth_token
}
auth_response = TestResponse({
"headers": auth_response_headers
})
scoped_token_dict = {
'token': {
'methods': ['password'],
'expires_at': expiration,
'project': {
'id': project_dict_1['id'],
'name': project_dict_1['name'],
'domain': {
'id': domain_dict['id'],
'name': domain_dict['name']
}
},
'user': {
'id': user_dict['id'],
'name': user_dict['name'],
'domain': {
'id': domain_dict['id'],
'name': domain_dict['name']
}
},
'roles': [role_dict],
'catalog': [keystone_service, nova_service]
}
}
test_data.scoped_access_info = AccessInfo.factory(
resp=auth_response,
body=scoped_token_dict
)
unscoped_token_dict = {
'token': {
'methods': ['password'],
'expires_at': expiration,
'user': {
'id': user_dict['id'],
'name': user_dict['name'],
'domain': {
'id': domain_dict['id'],
'name': domain_dict['name']
}
},
'roles': [role_dict],
'catalog': [keystone_service]
}
}
test_data.unscoped_access_info = AccessInfo.factory(
resp=auth_response,
body=unscoped_token_dict
)
# Service Catalog
test_data.service_catalog = ServiceCatalog.factory({
'methods': ['password'],
'user': {},
'catalog': [keystone_service, nova_service],
}, token=auth_token)
return test_data

View File

@@ -25,3 +25,13 @@ ROOT_URLCONF = 'openstack_auth.tests.urls'
LOGIN_REDIRECT_URL = '/'
SECRET_KEY = 'badcafe'
OPENSTACK_API_VERSIONS = {
"identity": 2.0
}
USE_TZ = True
OPENSTACK_KEYSTONE_MULTIDOMAIN_SUPPORT = False
OPENSTACK_KEYSTONE_DEFAULT_DOMAIN = 'domain'

View File

@@ -1,26 +1,34 @@
import mox
from django import test
from django.conf import settings
from django.contrib.auth import REDIRECT_FIELD_NAME
from django.core.urlresolvers import reverse
from keystoneclient import exceptions as keystone_exceptions
from keystoneclient.v2_0 import client
from keystoneclient.v2_0 import client as client_v2
from keystoneclient.v3 import client as client_v3
import mox
from .data import generate_test_data
from .data_v2 import generate_test_data as data_v2
from .data_v3 import generate_test_data as data_v3
class OpenStackAuthTests(test.TestCase):
DEFAULT_DOMAIN = settings.OPENSTACK_KEYSTONE_DEFAULT_DOMAIN
class OpenStackAuthTestsV2(test.TestCase):
def setUp(self):
super(OpenStackAuthTests, self).setUp()
super(OpenStackAuthTestsV2, self).setUp()
self.mox = mox.Mox()
self.data = generate_test_data()
self.data = data_v2()
self.ks_client_module = client_v2
endpoint = settings.OPENSTACK_KEYSTONE_URL
self.keystone_client = client.Client(endpoint=endpoint,
auth_ref=self.data.access_info)
if not hasattr(self.keystone_client, 'service_catalog'):
self.keystone_client.service_catalog = self.data.service_catalog
self.keystone_client_unscoped = self.ks_client_module.Client(
endpoint=endpoint,
auth_ref=self.data.unscoped_access_info)
self.keystone_client_scoped = self.ks_client_module.Client(
endpoint=endpoint,
auth_ref=self.data.scoped_access_info)
def tearDown(self):
self.mox.UnsetStubs()
@@ -29,31 +37,30 @@ class OpenStackAuthTests(test.TestCase):
def test_login(self):
tenants = [self.data.tenant_one, self.data.tenant_two]
user = self.data.user
sc = self.data.service_catalog
unscoped = self.data.unscoped_access_info
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'password': user.password,
'username': user.name}
self.mox.StubOutWithMock(client, "Client")
self.mox.StubOutWithMock(self.keystone_client.tenants, "list")
self.mox.StubOutWithMock(self.keystone_client.tokens, "authenticate")
self.mox.StubOutWithMock(self.ks_client_module, "Client")
self.mox.StubOutWithMock(self.keystone_client_unscoped.tenants, "list")
client.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
insecure=False,
tenant_id=None).AndReturn(self.keystone_client)
self.keystone_client.tenants.list().AndReturn(tenants)
client.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
tenant_id=self.data.tenant_two.id,
insecure=False,
token=sc.get_token()['id']) \
.AndReturn(self.keystone_client)
self.keystone_client.tokens.authenticate(tenant_id=tenants[1].id,
token=sc.get_token()['id'],
username=user.name) \
.AndReturn(self.data.scoped_token)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False)\
.AndReturn(self.keystone_client_unscoped)
self.keystone_client_unscoped.tenants.list().AndReturn(tenants)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
tenant_id=self.data.tenant_two.id,
insecure=False,
token=unscoped.auth_token,
debug=False) \
.AndReturn(self.keystone_client_scoped)
self.mox.ReplayAll()
@@ -71,18 +78,21 @@ class OpenStackAuthTests(test.TestCase):
user = self.data.user
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'password': user.password,
'username': user.name}
self.mox.StubOutWithMock(client, "Client")
self.mox.StubOutWithMock(self.keystone_client.tenants, "list")
self.mox.StubOutWithMock(self.ks_client_module, "Client")
self.mox.StubOutWithMock(self.keystone_client_unscoped.tenants, "list")
client.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
insecure=False,
tenant_id=None).AndReturn(self.keystone_client)
self.keystone_client.tenants.list().AndReturn([])
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False)\
.AndReturn(self.keystone_client_unscoped)
self.keystone_client_unscoped.tenants.list().AndReturn([])
self.mox.ReplayAll()
@@ -102,17 +112,19 @@ class OpenStackAuthTests(test.TestCase):
user = self.data.user
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'password': "invalid",
'username': user.name}
self.mox.StubOutWithMock(client, "Client")
self.mox.StubOutWithMock(self.ks_client_module, "Client")
exc = keystone_exceptions.Unauthorized(401)
client.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password="invalid",
username=user.name,
insecure=False,
tenant_id=None).AndRaise(exc)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password="invalid",
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False).AndRaise(exc)
self.mox.ReplayAll()
@@ -131,17 +143,19 @@ class OpenStackAuthTests(test.TestCase):
user = self.data.user
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'password': user.password,
'username': user.name}
self.mox.StubOutWithMock(client, "Client")
self.mox.StubOutWithMock(self.ks_client_module, "Client")
exc = keystone_exceptions.ClientException(500)
client.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
insecure=False,
tenant_id=None).AndRaise(exc)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False).AndRaise(exc)
self.mox.ReplayAll()
@@ -163,41 +177,39 @@ class OpenStackAuthTests(test.TestCase):
tenant = self.data.tenant_two
tenants = [self.data.tenant_one, self.data.tenant_two]
user = self.data.user
scoped = self.data.scoped_token
unscoped = self.data.unscoped_access_info
scoped = self.data.scoped_access_info
sc = self.data.service_catalog
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'username': user.name,
'password': user.password}
self.mox.StubOutWithMock(client, "Client")
self.mox.StubOutWithMock(self.keystone_client.tenants, "list")
self.mox.StubOutWithMock(self.keystone_client.tokens, "authenticate")
self.mox.StubOutWithMock(self.ks_client_module, "Client")
self.mox.StubOutWithMock(self.keystone_client_unscoped.tenants, "list")
client.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
insecure=False,
tenant_id=None).AndReturn(self.keystone_client)
self.keystone_client.tenants.list().AndReturn(tenants)
self.keystone_client.tokens.authenticate(tenant_id=tenants[1].id,
token=sc.get_token()['id'],
username=user.name) \
.AndReturn(scoped)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False) \
.AndReturn(self.keystone_client_unscoped)
self.keystone_client_unscoped.tenants.list().AndReturn(tenants)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
tenant_id=self.data.tenant_two.id,
insecure=False,
token=unscoped.auth_token,
debug=False) \
.AndReturn(self.keystone_client_scoped)
client.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
tenant_id=self.data.tenant_two.id,
insecure=False,
token=sc.get_token()['id']) \
.AndReturn(self.keystone_client)
client.Client(endpoint=sc.url_for(),
insecure=False) \
.AndReturn(self.keystone_client)
self.keystone_client.tokens.authenticate(tenant_id=tenant.id,
token=sc.get_token()['id']) \
.AndReturn(scoped)
self.ks_client_module.Client(auth_url=sc.url_for(),
tenant_id=tenant.id,
token=scoped.auth_token,
insecure=False,
debug=False) \
.AndReturn(self.keystone_client_scoped)
self.mox.ReplayAll()
@@ -211,10 +223,7 @@ class OpenStackAuthTests(test.TestCase):
url = reverse('switch_tenants', args=[tenant.id])
scoped.tenant['id'] = self.data.tenant_two._info
sc.catalog['token']['id'] = self.data.tenant_two.id
form_data['tenant_id'] = tenant.id
scoped['token']['tenant']['id'] = self.data.tenant_two.id
if next:
form_data.update({REDIRECT_FIELD_NAME: next})
@@ -226,8 +235,9 @@ class OpenStackAuthTests(test.TestCase):
self.assertEqual(response['location'], expected_url)
else:
self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
self.assertEqual(self.client.session['token']['token']['tenant']['id'],
scoped.tenant['id'])
self.assertEqual(self.client.session['token'].tenant['id'],
scoped.tenant_id)
def test_switch_with_next(self):
self.test_switch(next='/next_url')
@@ -236,33 +246,331 @@ class OpenStackAuthTests(test.TestCase):
tenant = self.data.tenant_one
tenants = [self.data.tenant_one, self.data.tenant_two]
user = self.data.user
scoped = self.data.scoped_token
unscoped = self.data.unscoped_access_info
sc = self.data.service_catalog
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'username': user.name,
'password': user.password}
self.mox.StubOutWithMock(client, "Client")
self.mox.StubOutWithMock(self.keystone_client.tenants, "list")
self.mox.StubOutWithMock(self.keystone_client.tokens, "authenticate")
self.mox.StubOutWithMock(self.ks_client_module, "Client")
self.mox.StubOutWithMock(self.keystone_client_unscoped.tenants, "list")
client.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
insecure=False,
tenant_id=None).AndReturn(self.keystone_client)
self.keystone_client.tenants.list().AndReturn(tenants)
self.keystone_client.tokens.authenticate(tenant_id=tenants[1].id,
token=sc.get_token()['id'],
username=user.name) \
.AndReturn(scoped)
client.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
tenant_id=self.data.tenant_two.id,
insecure=False,
token=sc.get_token()['id']) \
.AndReturn(self.keystone_client)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False) \
.AndReturn(self.keystone_client_unscoped)
self.keystone_client_unscoped.tenants.list().AndReturn(tenants)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
tenant_id=self.data.tenant_two.id,
insecure=False,
token=unscoped.auth_token,
debug=False) \
.AndReturn(self.keystone_client_scoped)
self.mox.ReplayAll()
url = reverse('login')
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
response = self.client.post(url, form_data)
self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
old_region = sc.get_endpoints()['compute'][0]['region']
self.assertEqual(self.client.session['services_region'], old_region)
region = sc.get_endpoints()['compute'][1]['region']
url = reverse('switch_services_region', args=[region])
form_data['region_name'] = region
if next:
form_data.update({REDIRECT_FIELD_NAME: next})
response = self.client.get(url, form_data)
if next:
expected_url = 'http://testserver%s' % next
self.assertEqual(response['location'], expected_url)
else:
self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
self.assertEqual(self.client.session['services_region'], region)
def test_switch_region_with_next(self, next=None):
self.test_switch_region(next='/next_url')
class OpenStackAuthTestsV3(test.TestCase):
def setUp(self):
super(OpenStackAuthTestsV3, self).setUp()
self.mox = mox.Mox()
self.data = data_v3()
self.ks_client_module = client_v3
endpoint = settings.OPENSTACK_KEYSTONE_URL
self.keystone_client_unscoped = self.ks_client_module.Client(
endpoint=endpoint,
auth_ref=self.data.unscoped_access_info)
self.keystone_client_scoped = self.ks_client_module.Client(
endpoint=endpoint,
auth_ref=self.data.scoped_access_info)
settings.OPENSTACK_API_VERSIONS['identity'] = 3
settings.OPENSTACK_KEYSTONE_URL = "http://localhost:5000/v3"
def tearDown(self):
self.mox.UnsetStubs()
self.mox.VerifyAll()
def test_login(self):
projects = [self.data.project_one, self.data.project_two]
user = self.data.user
unscoped = self.data.unscoped_access_info
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'password': user.password,
'username': user.name}
self.mox.StubOutWithMock(self.ks_client_module, "Client")
self.mox.StubOutWithMock(self.keystone_client_unscoped.projects,
"list")
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False)\
.AndReturn(self.keystone_client_unscoped)
self.keystone_client_unscoped.projects.list(user=user.id) \
.AndReturn(projects)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
tenant_id=self.data.project_two.id,
insecure=False,
token=unscoped.auth_token,
debug=False) \
.AndReturn(self.keystone_client_scoped)
self.mox.ReplayAll()
url = reverse('login')
# GET the page to set the test cookie.
response = self.client.get(url, form_data)
self.assertEqual(response.status_code, 200)
# POST to the page to log in.
response = self.client.post(url, form_data)
self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
def test_no_tenants(self):
user = self.data.user
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'password': user.password,
'username': user.name}
self.mox.StubOutWithMock(self.ks_client_module, "Client")
self.mox.StubOutWithMock(self.keystone_client_unscoped.projects,
"list")
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False)\
.AndReturn(self.keystone_client_unscoped)
self.keystone_client_unscoped.projects.list(user=user.id) \
.AndReturn([])
self.mox.ReplayAll()
url = reverse('login')
# GET the page to set the test cookie.
response = self.client.get(url, form_data)
self.assertEqual(response.status_code, 200)
# POST to the page to log in.
response = self.client.post(url, form_data)
self.assertTemplateUsed(response, 'auth/login.html')
self.assertContains(response,
'You are not authorized for any projects.')
def test_invalid_credentials(self):
user = self.data.user
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'password': "invalid",
'username': user.name}
self.mox.StubOutWithMock(self.ks_client_module, "Client")
exc = keystone_exceptions.Unauthorized(401)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password="invalid",
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False).AndRaise(exc)
self.mox.ReplayAll()
url = reverse('login')
# GET the page to set the test cookie.
response = self.client.get(url, form_data)
self.assertEqual(response.status_code, 200)
# POST to the page to log in.
response = self.client.post(url, form_data)
self.assertTemplateUsed(response, 'auth/login.html')
self.assertContains(response, "Invalid user name or password.")
def test_exception(self):
user = self.data.user
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'password': user.password,
'username': user.name}
self.mox.StubOutWithMock(self.ks_client_module, "Client")
exc = keystone_exceptions.ClientException(500)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False).AndRaise(exc)
self.mox.ReplayAll()
url = reverse('login')
# GET the page to set the test cookie.
response = self.client.get(url, form_data)
self.assertEqual(response.status_code, 200)
# POST to the page to log in.
response = self.client.post(url, form_data)
self.assertTemplateUsed(response, 'auth/login.html')
self.assertContains(response,
("An error occurred authenticating. Please try "
"again later."))
def test_switch(self, next=None):
project = self.data.project_two
projects = [self.data.project_one, self.data.project_two]
user = self.data.user
unscoped = self.data.unscoped_access_info
scoped = self.data.scoped_access_info
sc = self.data.service_catalog
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'username': user.name,
'password': user.password}
self.mox.StubOutWithMock(self.ks_client_module, "Client")
self.mox.StubOutWithMock(self.keystone_client_unscoped.projects,
"list")
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False) \
.AndReturn(self.keystone_client_unscoped)
self.keystone_client_unscoped.projects.list(user=user.id) \
.AndReturn(projects)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
tenant_id=self.data.project_two.id,
insecure=False,
token=unscoped.auth_token,
debug=False) \
.AndReturn(self.keystone_client_scoped)
self.ks_client_module.Client(auth_url=sc.url_for(),
tenant_id=project.id,
token=scoped.auth_token,
insecure=False,
debug=False) \
.AndReturn(self.keystone_client_scoped)
self.mox.ReplayAll()
url = reverse('login')
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
response = self.client.post(url, form_data)
self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
url = reverse('switch_tenants', args=[project.id])
scoped['project']['id'] = self.data.project_two.id
if next:
form_data.update({REDIRECT_FIELD_NAME: next})
response = self.client.get(url, form_data)
if next:
expected_url = 'http://testserver%s' % next
self.assertEqual(response['location'], expected_url)
else:
self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
self.assertEqual(self.client.session['token'].project['id'],
scoped.project_id)
def test_switch_with_next(self):
self.test_switch(next='/next_url')
def test_switch_region(self, next=None):
projects = [self.data.project_one, self.data.project_two]
user = self.data.user
unscoped = self.data.unscoped_access_info
sc = self.data.service_catalog
form_data = {'region': settings.OPENSTACK_KEYSTONE_URL,
'domain': DEFAULT_DOMAIN,
'username': user.name,
'password': user.password}
self.mox.StubOutWithMock(self.ks_client_module, "Client")
self.mox.StubOutWithMock(self.keystone_client_unscoped.projects,
"list")
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
password=user.password,
username=user.name,
user_domain_name=DEFAULT_DOMAIN,
insecure=False,
debug=False) \
.AndReturn(self.keystone_client_unscoped)
self.keystone_client_unscoped.projects.list(user=user.id) \
.AndReturn(projects)
self.ks_client_module.Client(auth_url=settings.OPENSTACK_KEYSTONE_URL,
tenant_id=self.data.project_two.id,
insecure=False,
token=unscoped.auth_token,
debug=False) \
.AndReturn(self.keystone_client_scoped)
self.mox.ReplayAll()

View File

@@ -1,7 +1,6 @@
from django.conf.urls.defaults import patterns, url
from .utils import patch_middleware_get_user
from .views import switch_region
patch_middleware_get_user()

View File

@@ -4,24 +4,23 @@ import logging
from django.contrib.auth.models import AnonymousUser
from django.conf import settings
from keystoneclient.v2_0 import client as keystone_client
from keystoneclient import exceptions as keystone_exceptions
from .utils import check_token_expiration, is_ans1_token
from .utils import check_token_expiration
from .utils import get_keystone_version
from .utils import get_project_list
from .utils import is_ans1_token
LOG = logging.getLogger(__name__)
def set_session_from_user(request, user):
if is_ans1_token(user.token.id):
hashed_token = hashlib.md5(user.token.id).hexdigest()
user.token._info['token']['id'] = hashed_token
request.session['token'] = user.token
if 'token_list' not in request.session:
request.session['token_list'] = []
token_tuple = (user.endpoint, user.token.id)
request.session['token_list'].append(token_tuple)
request.session['token'] = user.token._info
request.session['user_id'] = user.id
request.session['region_endpoint'] = user.endpoint
request.session['services_region'] = user.services_region
@@ -31,17 +30,65 @@ def create_user_from_token(request, token, endpoint, services_region=None):
return User(id=token.user['id'],
token=token,
user=token.user['name'],
tenant_id=token.tenant['id'],
tenant_name=token.tenant['name'],
user_domain_id=token.user_domain_id,
project_id=token.project['id'],
project_name=token.project['name'],
domain_id=token.domain['id'],
domain_name=token.domain['name'],
enabled=True,
service_catalog=token.serviceCatalog,
roles=token.user['roles'],
roles=token.roles,
endpoint=endpoint,
services_region=services_region)
class Token(object):
"""Token object that encapsulates the auth_ref (AccessInfo)from keystone
client.
Added for maintaining backward compatibility with horizon that expects
Token object in the user object.
"""
def __init__(self, auth_ref):
# User-related attributes
user = {}
user['id'] = auth_ref.user_id
user['name'] = auth_ref.username
self.user = user
self.user_domain_id = auth_ref.user_domain_id
#Token-related attributes
self.id = auth_ref.auth_token
if is_ans1_token(self.id):
self.id = hashlib.md5(self.id).hexdigest()
self.expires = auth_ref.expires
# Project-related attributes
project = {}
project['id'] = auth_ref.project_id
project['name'] = auth_ref.project_name
self.project = project
self.tenant = self.project
# Domain-related attributes
domain = {}
domain['id'] = auth_ref.domain_id
domain['name'] = auth_ref.domain_name
self.domain = domain
if auth_ref.version == 'v2.0':
self.roles = auth_ref['user']['roles']
else:
self.roles = auth_ref['roles']
if get_keystone_version() < 3:
self.serviceCatalog = auth_ref.get('serviceCatalog', [])
else:
self.serviceCatalog = auth_ref.get('catalog', [])
class User(AnonymousUser):
""" A User class with some extra special sauce for Keystone.
"""A User class with some extra special sauce for Keystone.
In addition to the standard Django user attributes, this class also has
the following:
@@ -50,14 +97,29 @@ class User(AnonymousUser):
The Keystone token object associated with the current user/tenant.
The token object is deprecated, user auth_ref instead.
.. attribute:: tenant_id
The id of the Keystone tenant for the current user/token.
The tenant_id keyword argument is deprecated, use project_id instead.
.. attribute:: tenant_name
The name of the Keystone tenant for the current user/token.
The tenant_name keyword argument is deprecated, use project_name
instead.
.. attribute:: project_id
The id of the Keystone project for the current user/token.
.. attribute:: project_name
The name of the Keystone project for the current user/token.
.. attribute:: service_catalog
The ``ServiceCatalog`` data returned by Keystone.
@@ -66,17 +128,35 @@ class User(AnonymousUser):
A list of dictionaries containing role names and ids as returned
by Keystone.
.. attribute:: services_region
A list of non-identity service endpoint regions extracted from the
service catalog.
.. attribute:: user_domain_id
The domain id of the current user.
.. attribute:: domain_id
The id of the Keystone domain scoped for the current user/token.
"""
def __init__(self, id=None, token=None, user=None, tenant_id=None,
service_catalog=None, tenant_name=None, roles=None,
authorized_tenants=None, endpoint=None, enabled=False,
services_region=None):
services_region=None, user_domain_id=None, domain_id=None,
domain_name=None, project_id=None, project_name=None):
self.id = id
self.pk = id
self.token = token
self.username = user
self.tenant_id = tenant_id
self.tenant_name = tenant_name
self.user_domain_id = user_domain_id
self.domain_id = domain_id
self.domain_name = domain_name
self.project_id = project_id or tenant_id
self.project_name = project_name or tenant_name
self.service_catalog = service_catalog
self._services_region = services_region or \
self.default_services_region()
@@ -85,6 +165,10 @@ class User(AnonymousUser):
self.enabled = enabled
self._authorized_tenants = authorized_tenants
# List of variables to be deprecated.
self.tenant_id = self.project_id
self.tenant_name = self.project_name
def __unicode__(self):
return self.username
@@ -131,14 +215,15 @@ class User(AnonymousUser):
endpoint = self.endpoint
token = self.token
try:
client = keystone_client.Client(username=self.username,
auth_url=endpoint,
token=token.id,
insecure=insecure)
self._authorized_tenants = client.tenants.list()
self._authorized_tenants = get_project_list(
user_id=self.id,
auth_url=endpoint,
token=token.id,
insecure=insecure,
debug=settings.DEBUG)
except (keystone_exceptions.ClientException,
keystone_exceptions.AuthorizationFailure):
LOG.exception('Unable to retrieve tenant list.')
LOG.exception('Unable to retrieve project list.')
return self._authorized_tenants or []
@authorized_tenants.setter

View File

@@ -5,7 +5,9 @@ from django.contrib import auth
from django.contrib.auth.models import AnonymousUser
from django.contrib.auth import middleware
from django.utils import timezone
from django.utils.dateparse import parse_datetime
from keystoneclient.v2_0 import client as client_v2
from keystoneclient.v3 import client as client_v3
"""
@@ -49,7 +51,7 @@ def check_token_expiration(token):
Returns ``True`` if the token has not yet expired, otherwise ``False``.
"""
expiration = parse_datetime(token.expires)
expiration = token.expires
if settings.USE_TZ and timezone.is_naive(expiration):
# Presumes that the Keystone is using UTC.
expiration = timezone.make_aware(expiration, timezone.utc)
@@ -121,3 +123,28 @@ def is_safe_url(url, host=None):
return False
netloc = urlparse.urlparse(url)[1]
return not netloc or netloc == host
# Helper for figuring out keystone version
# Implementation will change when API version discovery is available
def get_keystone_version():
return getattr(settings, 'OPENSTACK_API_VERSIONS', {}).get('identity', 2.0)
def get_keystone_client():
if get_keystone_version() < 3:
return client_v2
else:
return client_v3
def get_project_list(*args, **kwargs):
if get_keystone_version() < 3:
client = get_keystone_client().Client(*args, **kwargs)
return client.tenants.list()
else:
auth_url = kwargs.get('auth_url', '').replace('v2.0', 'v3')
kwargs['auth_url'] = auth_url
client = get_keystone_client().Client(*args, **kwargs)
client.management_url = auth_url
return client.projects.list(user=kwargs.get('user_id'))

View File

@@ -18,11 +18,13 @@ try:
except ImportError:
from .utils import is_safe_url
from keystoneclient.v2_0 import client as keystone_client
from keystoneclient.v2_0 import client as keystone_client_v2
from keystoneclient import exceptions as keystone_exceptions
from .forms import Login
from .user import set_session_from_user, create_user_from_token
from .user import set_session_from_user, create_user_from_token, Token
from .utils import get_keystone_client
from .utils import get_keystone_version
LOG = logging.getLogger(__name__)
@@ -88,34 +90,45 @@ def delete_all_tokens(token_list):
try:
endpoint = token_tuple[0]
token = token_tuple[1]
client = keystone_client.Client(endpoint=endpoint,
token=token,
insecure=insecure)
client.tokens.delete(token=token)
if get_keystone_version() < 3:
client = keystone_client_v2.Client(endpoint=endpoint,
token=token,
insecure=insecure,
debug=settings.DEBUG)
client.tokens.delete(token=token)
else:
# FIXME: KS-client does not have delete token available
# Need to add this later when it is exposed.
pass
except keystone_exceptions.ClientException as e:
LOG.info('Could not delete token')
@login_required
def switch(request, tenant_id, redirect_field_name=REDIRECT_FIELD_NAME):
""" Switches an authenticated user from one tenant to another. """
""" Switches an authenticated user from one project to another. """
LOG.debug('Switching to tenant %s for user "%s".'
% (tenant_id, request.user.username))
insecure = getattr(settings, 'OPENSTACK_SSL_NO_VERIFY', False)
endpoint = request.user.endpoint
client = keystone_client.Client(endpoint=endpoint,
insecure=insecure)
try:
token = client.tokens.authenticate(tenant_id=tenant_id,
token=request.user.token.id)
msg = 'Tenant switch successful for user "%(username)s".' % \
if get_keystone_version() >= 3:
endpoint = endpoint.replace('v2.0', 'v3')
client = get_keystone_client().Client(tenant_id=tenant_id,
token=request.user.token.id,
auth_url=endpoint,
insecure=insecure,
debug=settings.DEBUG)
auth_ref = client.auth_ref
msg = 'Project switch successful for user "%(username)s".' % \
{'username': request.user.username}
LOG.info(msg)
except keystone_exceptions.ClientException:
msg = 'Tenant switch failed for user "%(username)s".' % \
msg = 'Project switch failed for user "%(username)s".' % \
{'username': request.user.username}
LOG.warning(msg)
token = None
auth_ref = None
LOG.exception('An error occurred while switching sessions.')
# Ensure the user-originating redirection url is safe.
@@ -124,8 +137,8 @@ def switch(request, tenant_id, redirect_field_name=REDIRECT_FIELD_NAME):
if not is_safe_url(url=redirect_to, host=request.get_host()):
redirect_to = settings.LOGIN_REDIRECT_URL
if token:
user = create_user_from_token(request, token, endpoint)
if auth_ref:
user = create_user_from_token(request, Token(auth_ref), endpoint)
set_session_from_user(request, user)
return shortcuts.redirect(redirect_to)

View File

@@ -43,7 +43,7 @@ setup(
zip_safe=False,
install_requires=[
'django >= 1.4',
'python-keystoneclient >= 0.2'
'python-keystoneclient >= 0.3'
],
tests_require=[
'mox',